Skip to content

Commit

Permalink
Add in-process concurrency support to the PS worker (#123)
Browse files Browse the repository at this point in the history
The major changes are:
- Use `PSThreadOptions.ReuseThread` for the `InitialSessionState` when creating `Runspace`, so that every `PowerShellManager` only creates one thread and then reuse it afterwards. The default behavior is to create a new thread every time `PowerShell.Invoke` is called.
- Update `RequestProcessor` to process `InvocationRequest` in asynchronously via tasks.
- Implement `PowerShellManagerPool` using `BlockingCollection`
   - make upper bound of the pool configurable via an environment variable `PSWorkerInProcConcurrencyUpperBound`
   - make the pool able to expand in a lazy way
   - checkout `PowerShellManager` via `CheckoutIdleWorker` on the main thread. Once getting an idle instance back, the main thread will queue a task to process an invocation request on a thread-pool thread and forget about it -- the main thread then can go ahead to process the next message.
- Update the `RpcLogger` and make every `PowerShellManager` have its own logger instance.
   - also update the way to set the `RequestId` and `InvocationId` for logger. The original way to setup the context only works for single-thread design.
- Update `MessagingStream` to use a `BlockingCollection` to hold all messages that are about to be written out, then use a single thread-pool thread to take out items and write them to the gRPC channel.
   - currently, the way we write out response/log messages is completely task-based/async, using a semaphore for synchronization. However, this approach doesn't guarantee the order of the message.
   - this is because there could be multiple tasks blocked on the semaphore, and releasing the semaphore allows a blocked task to enter the semaphore, but there is no guaranteed order, such as first-in-first-out, for blocked threads to enter the semaphore.
   - so, the unblocked task could be a random one, and thus change the arrival order of the message when writing the message to the gRPC channel.
- Remove the two system logging we have in our worker, because they drastically worsen the processing time per an invocation request when there are a lot in-coming invocation requests.
   - the logging for "TriggerMetadata" parameter is not that useful, and should be removed
   - the execution time logging is good to have, but not necessary, especially when it impact the throughput.
  • Loading branch information
daxian-dbw authored Jan 7, 2019
1 parent 6406040 commit 5de474c
Show file tree
Hide file tree
Showing 13 changed files with 180 additions and 213 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ obj/
# Visual Studio IDE directory
.vs/

# VSCode directories that are not at the repository root
/**/.vscode/

# Project Rider IDE files
.idea.powershell/

Expand Down
2 changes: 2 additions & 0 deletions src/Logging/ILogger.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,7 @@ namespace Microsoft.Azure.Functions.PowerShellWorker.Utility
internal interface ILogger
{
void Log(LogLevel logLevel, string message, Exception exception = null, bool isUserLog = false);
void SetContext(string requestId, string invocationId);
void ResetContext();
}
}
13 changes: 6 additions & 7 deletions src/Logging/RpcLogger.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,34 +12,33 @@

namespace Microsoft.Azure.Functions.PowerShellWorker.Utility
{
internal class RpcLogger : ILogger, IDisposable
internal class RpcLogger : ILogger
{
private const string SystemLogPrefix = "LanguageWorkerConsoleLog";
private readonly MessagingStream _msgStream;
private readonly StringBuilder _systemLogMsg;
private string _invocationId;
private string _requestId;

public RpcLogger(MessagingStream msgStream)
internal RpcLogger(MessagingStream msgStream)
{
_msgStream = msgStream;
_systemLogMsg = new StringBuilder();
}

public IDisposable BeginScope(string requestId, string invocationId)
public void SetContext(string requestId, string invocationId)
{
_requestId = requestId;
_invocationId = invocationId;
return this;
}

public void Dispose()
public void ResetContext()
{
_requestId = null;
_invocationId = null;
}

public async void Log(LogLevel logLevel, string message, Exception exception = null, bool isUserLog = false)
public void Log(LogLevel logLevel, string message, Exception exception = null, bool isUserLog = false)
{
if (isUserLog)
{
Expand All @@ -56,7 +55,7 @@ public async void Log(LogLevel logLevel, string message, Exception exception = n
}
};

await _msgStream.WriteAsync(logMessage);
_msgStream.Write(logMessage);
}
else
{
Expand Down
60 changes: 29 additions & 31 deletions src/Messaging/MessagingStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
//

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

Expand All @@ -12,47 +13,44 @@

namespace Microsoft.Azure.Functions.PowerShellWorker.Messaging
{
internal class MessagingStream : IDisposable
internal class MessagingStream
{
private SemaphoreSlim _writeSemaphore = new SemaphoreSlim(1, 1);
private AsyncDuplexStreamingCall<StreamingMessage, StreamingMessage> _call;
private bool isDisposed;
private readonly AsyncDuplexStreamingCall<StreamingMessage, StreamingMessage> _call;
private readonly BlockingCollection<StreamingMessage> _msgQueue;

public MessagingStream(string host, int port)
internal MessagingStream(string host, int port)
{
Channel channel = new Channel(host, port, ChannelCredentials.Insecure);
_call = new FunctionRpc.FunctionRpcClient(channel).EventStream();
}

public void Dispose()
{
if (!isDisposed)
{
isDisposed = true;
_call.Dispose();
}
_msgQueue = new BlockingCollection<StreamingMessage>();
Task.Run(WriteImplAsync);
}

public StreamingMessage GetCurrentMessage() =>
isDisposed ? null : _call.ResponseStream.Current;

public async Task<bool> MoveNext() =>
!isDisposed && await _call.ResponseStream.MoveNext(CancellationToken.None);

public async Task WriteAsync(StreamingMessage message)
/// <summary>
/// Get the current message.
/// </summary>
internal StreamingMessage GetCurrentMessage() => _call.ResponseStream.Current;

/// <summary>
/// Move to the next message.
/// </summary>
internal async Task<bool> MoveNext() => await _call.ResponseStream.MoveNext(CancellationToken.None);

/// <summary>
/// Write the outgoing message to the buffer.
/// </summary>
internal void Write(StreamingMessage message) => _msgQueue.Add(message);

/// <summary>
/// Take a message from the buffer and write to the gRPC channel.
/// </summary>
private async Task WriteImplAsync()
{
if(isDisposed) return;

// Wait for the handle to be released because we can't have
// more than one message being sent at the same time
await _writeSemaphore.WaitAsync();
try
{
await _call.RequestStream.WriteAsync(message);
}
finally
while (true)
{
_writeSemaphore.Release();
StreamingMessage msg = _msgQueue.Take();
await _call.RequestStream.WriteAsync(msg);
}
}
}
Expand Down
15 changes: 8 additions & 7 deletions src/PowerShell/PowerShellManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ internal class PowerShellManager
/// </summary>
internal Guid InstanceId => _pwsh.Runspace.InstanceId;

/// <summary>
/// Gets the associated logger.
/// </summary>
internal ILogger Logger => _logger;

static PowerShellManager()
{
// Set the type accelerators for 'HttpResponseContext' and 'HttpResponseContext'.
Expand All @@ -48,6 +53,7 @@ internal PowerShellManager(ILogger logger)
}

var initialSessionState = InitialSessionState.CreateDefault();
initialSessionState.ThreadOptions = PSThreadOptions.ReuseThread;
initialSessionState.EnvironmentVariables.Add(
new SessionStateVariableEntry("PSModulePath", FunctionLoader.FunctionModulePath, null));

Expand Down Expand Up @@ -154,16 +160,11 @@ internal Hashtable InvokeFunction(
// Gives access to additional Trigger Metadata if the user specifies TriggerMetadata
if(functionInfo.FuncParameters.Contains(AzFunctionInfo.TriggerMetadata))
{
_logger.Log(LogLevel.Debug, "Parameter '-TriggerMetadata' found.");
_pwsh.AddParameter(AzFunctionInfo.TriggerMetadata, triggerMetadata);
}

Collection<object> pipelineItems = null;
using (ExecutionTimer.Start(_logger, "Execution of the user's function completed."))
{
pipelineItems = _pwsh.AddCommand("Microsoft.Azure.Functions.PowerShellWorker\\Trace-PipelineObject")
.InvokeAndClearCommands<object>();
}
Collection<object> pipelineItems = _pwsh.AddCommand("Microsoft.Azure.Functions.PowerShellWorker\\Trace-PipelineObject")
.InvokeAndClearCommands<object>();

var result = _pwsh.AddCommand("Microsoft.Azure.Functions.PowerShellWorker\\Get-OutputBinding")
.AddParameter("Purge", true)
Expand Down
78 changes: 64 additions & 14 deletions src/PowerShell/PowerShellManagerPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@
//

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Functions.PowerShellWorker.Messaging;
using Microsoft.Azure.Functions.PowerShellWorker.Utility;
using Microsoft.Azure.WebJobs.Script.Grpc.Messages;

namespace Microsoft.Azure.Functions.PowerShellWorker.PowerShell
{
Expand All @@ -15,37 +20,80 @@ namespace Microsoft.Azure.Functions.PowerShellWorker.PowerShell
/// </summary>
internal class PowerShellManagerPool
{
private readonly ILogger _logger;
// Today we don't really support the in-proc concurrency. We just hold an instance of PowerShellManager in this field.
private PowerShellManager _psManager;
private readonly int _upperBound;
private readonly MessagingStream _msgStream;
private readonly BlockingCollection<PowerShellManager> _pool;
private int _poolSize;

/// <summary>
/// Constructor of the pool.
/// </summary>
internal PowerShellManagerPool(ILogger logger)
internal PowerShellManagerPool(MessagingStream msgStream)
{
_logger = logger;
string upperBound = Environment.GetEnvironmentVariable("PSWorkerInProcConcurrencyUpperBound");
if (string.IsNullOrEmpty(upperBound) || !int.TryParse(upperBound, out _upperBound))
{
_upperBound = 1;
}

_msgStream = msgStream;
_pool = new BlockingCollection<PowerShellManager>(_upperBound);
}

/// <summary>
/// Initialize the pool and populate it with PowerShellManager instances.
/// When it's time to really implement this pool, we probably should instantiate PowerShellManager instances in a lazy way.
/// Maybe start from size 1 and increase the number of workers as needed.
/// We instantiate PowerShellManager instances in a lazy way, starting from size 1 and increase the number of workers as needed.
/// </summary>
internal void Initialize()
internal void Initialize(string requestId)
{
_psManager = new PowerShellManager(_logger);
var logger = new RpcLogger(_msgStream);

try
{
logger.SetContext(requestId, invocationId: null);
_pool.Add(new PowerShellManager(logger));
_poolSize = 1;
}
finally
{
logger.ResetContext();
}
}

/// <summary>
/// Checkout an idle PowerShellManager instance.
/// When it's time to really implement this pool, this method is supposed to block when there is no idle instance available.
/// Checkout an idle PowerShellManager instance in a non-blocking asynchronous way.
/// </summary>
internal PowerShellManager CheckoutIdleWorker(AzFunctionInfo functionInfo)
internal PowerShellManager CheckoutIdleWorker(StreamingMessage request, AzFunctionInfo functionInfo)
{
PowerShellManager psManager = null;
string requestId = request.RequestId;
string invocationId = request.InvocationRequest?.InvocationId;

// If the pool has an idle one, just use it.
if (!_pool.TryTake(out psManager))
{
// The pool doesn't have an idle one.
if (_poolSize < _upperBound &&
Interlocked.Increment(ref _poolSize) <= _upperBound)
{
// If the pool hasn't reached its bounded capacity yet, then
// we create a new item and return it.
var logger = new RpcLogger(_msgStream);
logger.SetContext(requestId, invocationId);
psManager = new PowerShellManager(logger);
}
else
{
// If the pool has reached its bounded capacity, then the thread
// should be blocked until an idle one becomes available.
psManager = _pool.Take();
}
}

// Register the function with the Runspace before returning the idle PowerShellManager.
FunctionMetadata.RegisterFunctionMetadata(_psManager.InstanceId, functionInfo);
return _psManager;
FunctionMetadata.RegisterFunctionMetadata(psManager.InstanceId, functionInfo);
psManager.Logger.SetContext(requestId, invocationId);
return psManager;
}

/// <summary>
Expand All @@ -57,6 +105,8 @@ internal void ReclaimUsedWorker(PowerShellManager psManager)
{
// Unregister the Runspace before reclaiming the used PowerShellManager.
FunctionMetadata.UnregisterFunctionMetadata(psManager.InstanceId);
psManager.Logger.ResetContext();
_pool.Add(psManager);
}
}
}
Expand Down
Loading

0 comments on commit 5de474c

Please sign in to comment.