Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Do not merged]Buffered Writes to rpc channel and async everywhere #192

Draft
wants to merge 6 commits into
base: dev
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions src/Eventing/EventSources.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

namespace Microsoft.Azure.Functions.PowerShellWorker
{
internal static class EventSources
{
public const string ScriptFiles = "ScriptFiles";
public const string Rpc = "Rpc";
public const string Worker = "Worker";
public const string WorkerProcess = "WorkerProcess";
}
}
12 changes: 12 additions & 0 deletions src/Eventing/IScriptEventManager.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;

namespace Microsoft.Azure.Functions.PowerShellWorker
{
internal interface IScriptEventManager : IObservable<ScriptEvent>
{
void Publish(ScriptEvent scriptEvent);
}
}
17 changes: 17 additions & 0 deletions src/Eventing/Rpc/InboundEvent.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using Grpc.Core;
using Microsoft.Azure.WebJobs.Script.Grpc.Messages;

namespace Microsoft.Azure.Functions.PowerShellWorker
{
internal class InboundEvent : RpcEvent
{
public IAsyncStreamReader<StreamingMessage> requestStream;

public InboundEvent(string workerId, StreamingMessage message) : base(workerId, message, MessageOrigin.Worker)
{
}
}
}
13 changes: 13 additions & 0 deletions src/Eventing/Rpc/OutboundEvent.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using Microsoft.Azure.WebJobs.Script.Grpc.Messages;

namespace Microsoft.Azure.Functions.PowerShellWorker
{
internal class OutboundEvent : RpcEvent
{
public OutboundEvent(string workerId, StreamingMessage message) : base(workerId, message, MessageOrigin.Host)
{
}
}
}
31 changes: 31 additions & 0 deletions src/Eventing/Rpc/RpcEvent.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using Microsoft.Azure.WebJobs.Script.Grpc.Messages;

namespace Microsoft.Azure.Functions.PowerShellWorker
{
internal class RpcEvent : ScriptEvent
{
internal RpcEvent(string workerId, StreamingMessage message, MessageOrigin origin = MessageOrigin.Host)
: base(message.ContentCase.ToString(), EventSources.Rpc)
{
Message = message;
Origin = origin;
WorkerId = workerId;
}

public enum MessageOrigin
{
Worker,
Host
}

public MessageOrigin Origin { get; }

public StreamingMessage.ContentOneofCase MessageType => Message.ContentCase;

public string WorkerId { get; }

public StreamingMessage Message { get; }
}
}
18 changes: 18 additions & 0 deletions src/Eventing/ScriptEvent.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

namespace Microsoft.Azure.Functions.PowerShellWorker
{
internal class ScriptEvent
{
public ScriptEvent(string name, string source)
{
Name = name;
Source = source;
}

public string Name { get; }

public string Source { get; }
}
}
51 changes: 51 additions & 0 deletions src/Eventing/ScriptEventManager.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Reactive.Subjects;

namespace Microsoft.Azure.Functions.PowerShellWorker
{
internal sealed class ScriptEventManager : IScriptEventManager, IDisposable
{
private readonly Subject<ScriptEvent> _subject = new Subject<ScriptEvent>();
private bool _disposed = false;

public void Publish(ScriptEvent scriptEvent)
{
ThrowIfDisposed();

_subject.OnNext(scriptEvent);
}

public IDisposable Subscribe(IObserver<ScriptEvent> observer)
{
ThrowIfDisposed();

return _subject.Subscribe(observer);
}

private void ThrowIfDisposed()
{
if (_disposed)
{
throw new ObjectDisposedException(nameof(ScriptEventManager));
}
}

private void Dispose(bool disposing)
{
if (!_disposed)
{
if (disposing)
{
_subject.Dispose();
}

_disposed = true;
}
}

public void Dispose() => Dispose(true);
}
}
2 changes: 1 addition & 1 deletion src/Logging/RpcLogger.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public void Log(LogLevel logLevel, string message, Exception exception = null, b
}
};

_msgStream.Write(logMessage);
_msgStream.AddToBlockingQueue(logMessage);
}
else
{
Expand Down
35 changes: 20 additions & 15 deletions src/Messaging/MessagingStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
//

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

Expand All @@ -15,7 +16,7 @@ namespace Microsoft.Azure.Functions.PowerShellWorker.Messaging
internal class MessagingStream
{
private readonly AsyncDuplexStreamingCall<StreamingMessage, StreamingMessage> _call;
private readonly SemaphoreSlim _semaphoreSlim = new SemaphoreSlim(initialCount: 1, maxCount: 1);
private BlockingCollection<StreamingMessage> _blockingCollectionQueue = new BlockingCollection<StreamingMessage>();

internal MessagingStream(string host, int port)
{
Expand All @@ -28,30 +29,34 @@ internal MessagingStream(string host, int port)
/// </summary>
internal StreamingMessage GetCurrentMessage() => _call.ResponseStream.Current;

internal void AddToBlockingQueue(StreamingMessage streamingMessage)
{
_blockingCollectionQueue.Add(streamingMessage);
}

/// <summary>
/// Move to the next message.
/// </summary>
internal async Task<bool> MoveNext() => await _call.ResponseStream.MoveNext(CancellationToken.None);

/// <summary>
/// Write the outgoing message.
/// </summary>
internal void Write(StreamingMessage message) => WriteImplAsync(message).ConfigureAwait(false);

/// <summary>
/// Take a message from the buffer and write to the gRPC channel.
/// </summary>
private async Task WriteImplAsync(StreamingMessage message)
internal Task Write()
{
try
{
await _semaphoreSlim.WaitAsync();
await _call.RequestStream.WriteAsync(message);
}
finally
var consumer = Task.Run(async () =>
{
_semaphoreSlim.Release();
}
foreach (var rpcWriteMsg in _blockingCollectionQueue.GetConsumingEnumerable())
{
await _call.RequestStream.WriteAsync(rpcWriteMsg);
}
});
return consumer;
}

internal Task Write(StreamingMessage msg)
{
return _call.RequestStream.WriteAsync(msg);
}
}
}
1 change: 1 addition & 0 deletions src/Microsoft.Azure.Functions.PowerShellWorker.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ Licensed under the MIT license. See LICENSE file in the project root for full li
<PackageReference Include="Newtonsoft.Json" Version="12.0.1" />
<PackageReference Include="CommandLineParser" Version="2.3.0" />
<PackageReference Include="Google.Protobuf" Version="3.6.1" />
<PackageReference Include="System.Reactive" Version="4.1.5" />
</ItemGroup>

<ItemGroup>
Expand Down
Loading