Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions eng/Packages.Data.props
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,9 @@
<PackageReference Update="Microsoft.Azure.SignalR.Management" Version="1.19.2" />
<PackageReference Update="Microsoft.Azure.SignalR.Protocols" Version="1.19.2" />
<PackageReference Update="Microsoft.Azure.SignalR.Serverless.Protocols" Version="1.9.0" />
<PackageReference Update="Microsoft.Azure.WebJobs" Version="3.0.36" />
<PackageReference Update="Microsoft.Azure.WebJobs.Sources" Version="3.0.36" />
<PackageReference Update="Microsoft.Azure.WebJobs" Version="3.0.37" />
<PackageReference Update="Microsoft.Azure.WebJobs.Sources" Version="3.0.37" />
<PackageReference Update="Microsoft.Azure.WebJobs.Host.Storage" Version="5.0.0" />
<PackageReference Update="Microsoft.Spatial" Version="7.5.3" />
<PackageReference Update="Newtonsoft.Json" Version="10.0.3" />
</ItemGroup>
Expand Down Expand Up @@ -216,7 +217,7 @@
<PackageReference Update="Microsoft.AspNetCore.Server.Kestrel" Version="2.1.3" />
<PackageReference Update="Microsoft.AspNetCore.Server.Kestrel.Core" Version="2.1.25" />
<PackageReference Update="Microsoft.AspNetCore.Server.WebListener" Version="1.1.4" />
<PackageReference Update="Microsoft.AspNetCore.Http" Version="2.1.22" />
<PackageReference Update="Microsoft.AspNetCore.Http" Version="2.2.2" />
<PackageReference Update="Microsoft.Azure.Core.Spatial" Version="1.0.0" />
<PackageReference Update="Microsoft.Azure.Core.NewtonsoftJson" Version="1.0.0" />
<PackageReference Update="Microsoft.Azure.Devices" Version="1.38.2" />
Expand Down Expand Up @@ -249,7 +250,7 @@
<PackageReference Update="Microsoft.Extensions.PlatformAbstractions" Version="1.1.0" />
<PackageReference Update="Microsoft.Graph" Version="4.52.0" />
<PackageReference Update="Microsoft.NET.Test.Sdk" Version="17.0.0" />
<PackageReference Update="Microsoft.NET.Sdk.Functions" Version="3.0.12" />
<PackageReference Update="Microsoft.NET.Sdk.Functions" Version="4.2.0" />
<PackageReference Update="Microsoft.Rest.ClientRuntime.Azure.Authentication" Version="[2.4.0]" />
<PackageReference Update="Microsoft.Rest.ClientRuntime.Azure.TestFramework" Version="[1.7.7, 2.0.0)" />
<PackageReference Update="Microsoft.ServiceFabric.Data" Version="3.3.624" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

<ItemGroup>
<ProjectReference Include="$(AzureCoreTestFramework)" />
<ProjectReference Include="..\..\..\extensions\Microsoft.Extensions.Azure\src\Microsoft.Extensions.Azure.csproj" />
<ProjectReference Include="..\src\Microsoft.Azure.WebJobs.Extensions.EventGrid.csproj" />
</ItemGroup>

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<RequiredTargetFrameworks>netcoreapp3.1</RequiredTargetFrameworks>
<RequiredTargetFrameworks>net6.0</RequiredTargetFrameworks>
<TargetFrameworks>$(RequiredTargetFrameworks)</TargetFrameworks>
<AzureFunctionsVersion>v3</AzureFunctionsVersion>
</PropertyGroup>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using System;
using Azure.Core;
using Microsoft.Extensions.Azure;
using Microsoft.Extensions.Configuration;

namespace Microsoft.Azure.WebJobs.Host.TestCommon
{
public class TestComponentFactory : AzureComponentFactory
{
private readonly AzureComponentFactory _factory;
private readonly TokenCredential _tokenCredential;

public TestComponentFactory(AzureComponentFactory factory, TokenCredential tokenCredential)
{
_factory = factory;
_tokenCredential = tokenCredential;
}

public override TokenCredential CreateTokenCredential(IConfiguration configuration)
{
return _tokenCredential != null ? _tokenCredential : _factory.CreateTokenCredential(configuration);
}

public override object CreateClientOptions(Type optionsType, object serviceVersion, IConfiguration configuration)
=> _factory.CreateClientOptions(optionsType, serviceVersion, configuration);

public override object CreateClient(Type clientType, IConfiguration configuration, TokenCredential credential, object clientOptions)
=> _factory.CreateClient(clientType, configuration, credential, clientOptions);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,5 +53,6 @@ namespace Microsoft.Extensions.Hosting
public static partial class StorageBlobsWebJobsBuilderExtensions
{
public static Microsoft.Azure.WebJobs.IWebJobsBuilder AddAzureStorageBlobs(this Microsoft.Azure.WebJobs.IWebJobsBuilder builder, System.Action<Microsoft.Azure.WebJobs.Host.BlobsOptions> configureBlobs = null) { throw null; }
public static Microsoft.Azure.WebJobs.IWebJobsBuilder AddAzureStorageBlobsScaleForTrigger(this Microsoft.Azure.WebJobs.IWebJobsBuilder builder, Microsoft.Azure.WebJobs.Host.Scale.TriggerMetadata triggerMetadata) { throw null; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,24 +27,21 @@ internal class BlobLogListener
private readonly BlobServiceClient _blobClient;
private readonly HashSet<string> _scannedBlobNames = new HashSet<string>();
private readonly StorageAnalyticsLogParser _parser;
private readonly IWebJobsExceptionHandler _exceptionHandler;
private readonly ILogger<BlobListener> _logger;

private BlobLogListener(BlobServiceClient blobClient, IWebJobsExceptionHandler exceptionHandler, ILogger<BlobListener> logger)
private BlobLogListener(BlobServiceClient blobClient, ILogger<BlobListener> logger)
{
_blobClient = blobClient;
_exceptionHandler = exceptionHandler ?? throw new ArgumentNullException(nameof(exceptionHandler));
Comment thread
amnguye marked this conversation as resolved.

_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_parser = new StorageAnalyticsLogParser(logger);
}

// This will throw if the client credentials are not valid.
public static async Task<BlobLogListener> CreateAsync(BlobServiceClient blobClient,
IWebJobsExceptionHandler exceptionHandler, ILogger<BlobListener> logger, CancellationToken cancellationToken)
public static async Task<BlobLogListener> CreateAsync(BlobServiceClient blobClient, ILogger<BlobListener> logger, CancellationToken cancellationToken)
{
await EnableLoggingAsync(blobClient, cancellationToken).ConfigureAwait(false);
return new BlobLogListener(blobClient, exceptionHandler, logger);
return new BlobLogListener(blobClient, logger);
}

public async Task<IEnumerable<BlobWithContainer<BlobBaseClient>>> GetRecentBlobWritesAsync(CancellationToken cancellationToken,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using System;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.Specialized;
using Microsoft.Azure.WebJobs.Host.Scale;
using Microsoft.Extensions.Azure;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;

namespace Microsoft.Azure.WebJobs.Extensions.Storage.Blobs.Listeners
{
internal class BlobScalerMonitorProvider : IScaleMonitorProvider
{
private readonly IScaleMonitor _scaleMonitor;

public BlobScalerMonitorProvider(IServiceProvider serviceProvider, TriggerMetadata triggerMetadata)
{
AzureComponentFactory azureComponentFactory = null;
if ((triggerMetadata.Properties != null) && (triggerMetadata.Properties.TryGetValue(nameof(AzureComponentFactory), out object value)))
Comment thread
alrod marked this conversation as resolved.
{
azureComponentFactory = value as AzureComponentFactory;
}
else
{
azureComponentFactory = serviceProvider.GetService<AzureComponentFactory>();
}
IConfiguration configuration = serviceProvider.GetService<IConfiguration>();
AzureEventSourceLogForwarder logForwarder = serviceProvider.GetService<AzureEventSourceLogForwarder>();
var factory = serviceProvider.GetService<ILoggerFactory>();
BlobMetadata blobMetadata = JsonConvert.DeserializeObject<BlobMetadata>(triggerMetadata.Metadata.ToString());
BlobServiceClientProvider blobServiceClientProvider = new BlobServiceClientProvider(configuration, azureComponentFactory, logForwarder, factory.CreateLogger<BlobServiceClient>());
BlobServiceClient blobServiceClient = blobServiceClientProvider.Get(blobMetadata.Connection, serviceProvider.GetRequiredService<INameResolver>());
_scaleMonitor = new ZeroToOneScaleMonitor(triggerMetadata.FunctionName, blobServiceClient, factory);
}

public IScaleMonitor GetMonitor()
{
return _scaleMonitor;
}

private class ZeroToOneScaleMonitor : IScaleMonitor<ScaleMetrics>
{
private readonly ScaleMonitorDescriptor _scaleMonitorDescriptor;
private readonly Lazy<Task<BlobLogListener>> _blobLogListener;
private readonly ILogger _logger;
private int _threadSafeWritesDetectedValue;

public ZeroToOneScaleMonitor(string functionId, BlobServiceClient blobServiceClient, ILoggerFactory loggerFactory)
{
_scaleMonitorDescriptor = new ScaleMonitorDescriptor(functionId, functionId);
_blobLogListener = new(() => BlobLogListener.CreateAsync(blobServiceClient, loggerFactory.CreateLogger<BlobListener>(), CancellationToken.None));
Comment thread
alrod marked this conversation as resolved.
Outdated
_logger = loggerFactory.CreateLogger<ZeroToOneScaleMonitor>();
}

public ScaleMonitorDescriptor Descriptor => _scaleMonitorDescriptor;

public async Task<ScaleMetrics> GetMetricsAsync()
{
// if new blob were detected we want to GetScaleStatus return scale out vote at least once
if (Interlocked.Equals(_threadSafeWritesDetectedValue, 1))
{
_logger.LogInformation($"New writes were detectd but GetScaleStatus was not called. Waiting GetScaleStatus to call.");
return new ScaleMetrics();
}

var blobLogListener = await _blobLogListener.Value.ConfigureAwait(false);
BlobWithContainer<BlobBaseClient>[] recentWrites = (await blobLogListener.GetRecentBlobWritesAsync(CancellationToken.None).ConfigureAwait(false)).ToArray();
if (recentWrites.Length > 0)
{
StringBuilder stringBuilder = new StringBuilder();
foreach (var write in recentWrites)
{
stringBuilder.Append($"'{write.BlobClient.Name}', ");
if (stringBuilder.Length > 1000)
{
stringBuilder.Append("[truncated]");
break;
}
}
_logger.LogInformation($"'{recentWrites.Length}' recent writes were detected for '{_scaleMonitorDescriptor.FunctionId}': {stringBuilder}");
Interlocked.CompareExchange(ref _threadSafeWritesDetectedValue, 1, 0);
}
else
{
_logger.LogInformation($"No recent writes were detected for '{_scaleMonitorDescriptor.FunctionId}'");
Interlocked.CompareExchange(ref _threadSafeWritesDetectedValue, 0, 1);
}
return new ScaleMetrics();
}

public ScaleStatus GetScaleStatus(ScaleStatusContext context)
{
return GetScaleStatusCore(context.WorkerCount);
}

public ScaleStatus GetScaleStatus(ScaleStatusContext<ScaleMetrics> context)
{
return GetScaleStatusCore(context.WorkerCount);
}

private ScaleStatus GetScaleStatusCore(int workerCount)
{
// if there is at least one worker we assume all the blobs are added to internal queue and we need to ScaleIn
if (workerCount > 0)
{
// Set to 0 if there is an active worker
Interlocked.CompareExchange(ref _threadSafeWritesDetectedValue, 0, 1);
}

ScaleVote vote = ScaleVote.None;
if (workerCount == 0 && _threadSafeWritesDetectedValue == 1)
{
vote = ScaleVote.ScaleOut;
}
else if (workerCount > 0 && _threadSafeWritesDetectedValue == 0)
{
vote = ScaleVote.ScaleIn;
}
else if (workerCount == 0 && _threadSafeWritesDetectedValue == 0)
{
vote = ScaleVote.None;
}
_logger.LogInformation($"Current vote is '{vote}', active workers is '{workerCount}' for '{_scaleMonitorDescriptor.FunctionId}'");

return new ScaleStatus()
{
Vote = vote
};
}
}

internal class BlobMetadata
{
[JsonProperty]
public string Connection { get; set; }
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public string Type
// BlobType enum have different values in track 2 vs track 1, e.g. Block vs BlockBlob.
// This internal property makes sure we serialize new type same way track 1 extension did.
// This also makes sure we can read both formats since we already shipped few betas and don't want to disturb them.
[JsonProperty("BlobType")]
[JsonProperty(nameof(BlobType))]
private string BlobTypeInternal {
get
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public async Task RegisterAsync(BlobServiceClient blobServiceClient, BlobContain

if (!_logListeners.ContainsKey(blobServiceClient))
{
BlobLogListener logListener = await BlobLogListener.CreateAsync(blobServiceClient, _exceptionHandler, _logger, cancellationToken).ConfigureAwait(false);
BlobLogListener logListener = await BlobLogListener.CreateAsync(blobServiceClient, _logger, cancellationToken).ConfigureAwait(false);
_logListeners.Add(blobServiceClient, logListener);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
using Microsoft.Azure.WebJobs.Extensions.Storage.Common;
using Microsoft.Azure.WebJobs.Extensions.Storage.Common.Listeners;
using Microsoft.Azure.WebJobs.Host;
using Microsoft.Azure.WebJobs.Host.Scale;
using Microsoft.Extensions.Azure;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
Expand Down Expand Up @@ -63,5 +64,21 @@ public static IWebJobsBuilder AddAzureStorageBlobs(this IWebJobsBuilder builder,

return builder;
}

/// <summary>
/// Adds the Storage Queues extension to the provided <see cref="IWebJobsBuilder"/>.
/// </summary>
/// <param name="builder"></param>
/// <param name="triggerMetadata">Trigger metadata.</param>
/// <returns></returns>
public static IWebJobsBuilder AddAzureStorageBlobsScaleForTrigger(this IWebJobsBuilder builder, TriggerMetadata triggerMetadata)
{
builder.Services.AddSingleton<IScaleMonitorProvider>(serviceProvider =>
{
return new BlobScalerMonitorProvider(serviceProvider, triggerMetadata);
});

return builder;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ internal TargetScalerResult GetScaleResultInternal(TargetScalerContext context,

int targetWorkerCount = (int)Math.Ceiling(queueLength / (decimal)concurrency);

_logger.LogInformation($"'Target worker count for function '{_functionId}' is '{targetWorkerCount}' (QueueName='{_queueName}', QueueLength ='{queueLength}', Concurrency='{concurrency}').");
_logger.LogInformation($"Target worker count for function '{_functionId}' is '{targetWorkerCount}' (QueueName='{_queueName}', QueueLength ='{queueLength}', Concurrency='{concurrency}').");
return new TargetScalerResult
{
TargetWorkerCount = targetWorkerCount
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,5 +82,6 @@ namespace Microsoft.Extensions.Hosting
public static partial class StorageQueuesWebJobsBuilderExtensions
{
public static Microsoft.Azure.WebJobs.IWebJobsBuilder AddAzureStorageQueues(this Microsoft.Azure.WebJobs.IWebJobsBuilder builder, System.Action<Microsoft.Azure.WebJobs.Host.QueuesOptions> configureQueues = null) { throw null; }
public static Microsoft.Azure.WebJobs.IWebJobsBuilder AddAzureStorageQueuesScaleForTrigger(this Microsoft.Azure.WebJobs.IWebJobsBuilder builder, Microsoft.Azure.WebJobs.Host.Scale.TriggerMetadata triggerMetadata) { throw null; }
}
}
Loading