diff --git a/eng/Packages.Data.props b/eng/Packages.Data.props index ad8c084497fe..8b120bb96bec 100644 --- a/eng/Packages.Data.props +++ b/eng/Packages.Data.props @@ -139,8 +139,9 @@ - - + + + @@ -216,7 +217,7 @@ - + @@ -249,7 +250,7 @@ - + diff --git a/sdk/eventgrid/Microsoft.Azure.WebJobs.Extensions.EventGrid/tests/Microsoft.Azure.WebJobs.Extensions.EventGrid.Tests.csproj b/sdk/eventgrid/Microsoft.Azure.WebJobs.Extensions.EventGrid/tests/Microsoft.Azure.WebJobs.Extensions.EventGrid.Tests.csproj index 2e98289383eb..21519e241851 100644 --- a/sdk/eventgrid/Microsoft.Azure.WebJobs.Extensions.EventGrid/tests/Microsoft.Azure.WebJobs.Extensions.EventGrid.Tests.csproj +++ b/sdk/eventgrid/Microsoft.Azure.WebJobs.Extensions.EventGrid/tests/Microsoft.Azure.WebJobs.Extensions.EventGrid.Tests.csproj @@ -16,6 +16,7 @@ + diff --git a/sdk/extensions/Microsoft.Azure.WebJobs.Extensions.Clients/samples/Microsoft.Azure.WebJobs.Extensions.Clients.Samples.csproj b/sdk/extensions/Microsoft.Azure.WebJobs.Extensions.Clients/samples/Microsoft.Azure.WebJobs.Extensions.Clients.Samples.csproj index 046fd70df36b..543f81902d9a 100644 --- a/sdk/extensions/Microsoft.Azure.WebJobs.Extensions.Clients/samples/Microsoft.Azure.WebJobs.Extensions.Clients.Samples.csproj +++ b/sdk/extensions/Microsoft.Azure.WebJobs.Extensions.Clients/samples/Microsoft.Azure.WebJobs.Extensions.Clients.Samples.csproj @@ -1,6 +1,6 @@  - netcoreapp3.1 + net6.0 $(RequiredTargetFrameworks) v3 diff --git a/sdk/extensions/Microsoft.Azure.WebJobs.Extensions.Clients/tests/shared/TestComponentFactory.cs b/sdk/extensions/Microsoft.Azure.WebJobs.Extensions.Clients/tests/shared/TestComponentFactory.cs new file mode 100644 index 000000000000..6102e1ce9b0d --- /dev/null +++ b/sdk/extensions/Microsoft.Azure.WebJobs.Extensions.Clients/tests/shared/TestComponentFactory.cs @@ -0,0 +1,33 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; +using Azure.Core; +using Microsoft.Extensions.Azure; +using Microsoft.Extensions.Configuration; + +namespace Microsoft.Azure.WebJobs.Host.TestCommon +{ + public class TestComponentFactory : AzureComponentFactory + { + private readonly AzureComponentFactory _factory; + private readonly TokenCredential _tokenCredential; + + public TestComponentFactory(AzureComponentFactory factory, TokenCredential tokenCredential) + { + _factory = factory; + _tokenCredential = tokenCredential; + } + + public override TokenCredential CreateTokenCredential(IConfiguration configuration) + { + return _tokenCredential != null ? _tokenCredential : _factory.CreateTokenCredential(configuration); + } + + public override object CreateClientOptions(Type optionsType, object serviceVersion, IConfiguration configuration) + => _factory.CreateClientOptions(optionsType, serviceVersion, configuration); + + public override object CreateClient(Type clientType, IConfiguration configuration, TokenCredential credential, object clientOptions) + => _factory.CreateClient(clientType, configuration, credential, clientOptions); + } +} \ No newline at end of file diff --git a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/api/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs.netstandard2.0.cs b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/api/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs.netstandard2.0.cs index 6e949d4ca3b4..72b2f32ce4da 100644 --- a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/api/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs.netstandard2.0.cs +++ b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/api/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs.netstandard2.0.cs @@ -53,5 +53,6 @@ namespace Microsoft.Extensions.Hosting public static partial class StorageBlobsWebJobsBuilderExtensions { public static Microsoft.Azure.WebJobs.IWebJobsBuilder AddAzureStorageBlobs(this Microsoft.Azure.WebJobs.IWebJobsBuilder builder, System.Action configureBlobs = null) { throw null; } + public static Microsoft.Azure.WebJobs.IWebJobsBuilder AddAzureStorageBlobsScaleForTrigger(this Microsoft.Azure.WebJobs.IWebJobsBuilder builder, Microsoft.Azure.WebJobs.Host.Scale.TriggerMetadata triggerMetadata) { throw null; } } } diff --git a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/BlobLogListener.cs b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/BlobLogListener.cs index 133f016050e3..4b4c683df5da 100644 --- a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/BlobLogListener.cs +++ b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/BlobLogListener.cs @@ -27,24 +27,21 @@ internal class BlobLogListener private readonly BlobServiceClient _blobClient; private readonly HashSet _scannedBlobNames = new HashSet(); private readonly StorageAnalyticsLogParser _parser; - private readonly IWebJobsExceptionHandler _exceptionHandler; private readonly ILogger _logger; - private BlobLogListener(BlobServiceClient blobClient, IWebJobsExceptionHandler exceptionHandler, ILogger logger) + private BlobLogListener(BlobServiceClient blobClient, ILogger logger) { _blobClient = blobClient; - _exceptionHandler = exceptionHandler ?? throw new ArgumentNullException(nameof(exceptionHandler)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); _parser = new StorageAnalyticsLogParser(logger); } // This will throw if the client credentials are not valid. - public static async Task CreateAsync(BlobServiceClient blobClient, - IWebJobsExceptionHandler exceptionHandler, ILogger logger, CancellationToken cancellationToken) + public static async Task CreateAsync(BlobServiceClient blobClient, ILogger logger, CancellationToken cancellationToken) { await EnableLoggingAsync(blobClient, cancellationToken).ConfigureAwait(false); - return new BlobLogListener(blobClient, exceptionHandler, logger); + return new BlobLogListener(blobClient, logger); } public async Task>> GetRecentBlobWritesAsync(CancellationToken cancellationToken, diff --git a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/BlobScalerMonitorProvider.cs b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/BlobScalerMonitorProvider.cs new file mode 100644 index 000000000000..bdaba188df44 --- /dev/null +++ b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/BlobScalerMonitorProvider.cs @@ -0,0 +1,155 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Azure.Storage.Blobs; +using Azure.Storage.Blobs.Specialized; +using Microsoft.Azure.WebJobs.Host.Scale; +using Microsoft.Extensions.Azure; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using Newtonsoft.Json; + +namespace Microsoft.Azure.WebJobs.Extensions.Storage.Blobs.Listeners +{ + internal class BlobScalerMonitorProvider : IScaleMonitorProvider + { + private readonly IScaleMonitor _scaleMonitor; + + public BlobScalerMonitorProvider(IServiceProvider serviceProvider, TriggerMetadata triggerMetadata) + { + AzureComponentFactory azureComponentFactory = null; + if ((triggerMetadata.Properties != null) && (triggerMetadata.Properties.TryGetValue(nameof(AzureComponentFactory), out object value))) + { + azureComponentFactory = value as AzureComponentFactory; + } + else + { + azureComponentFactory = serviceProvider.GetService(); + } + IConfiguration configuration = serviceProvider.GetService(); + AzureEventSourceLogForwarder logForwarder = serviceProvider.GetService(); + var factory = serviceProvider.GetService(); + BlobMetadata blobMetadata = JsonConvert.DeserializeObject(triggerMetadata.Metadata.ToString()); + BlobServiceClientProvider blobServiceClientProvider = new BlobServiceClientProvider(configuration, azureComponentFactory, logForwarder, factory.CreateLogger()); + BlobServiceClient blobServiceClient = blobServiceClientProvider.Get(blobMetadata.Connection, serviceProvider.GetRequiredService()); + _scaleMonitor = new ZeroToOneScaleMonitor(triggerMetadata.FunctionName, blobServiceClient, factory); + } + + public IScaleMonitor GetMonitor() + { + return _scaleMonitor; + } + + private class ZeroToOneScaleMonitor : IScaleMonitor + { + private readonly ScaleMonitorDescriptor _scaleMonitorDescriptor; + private readonly Lazy> _blobLogListener; + private readonly ILogger _logger; + private int _threadSafeWritesDetectedValue; + + public ZeroToOneScaleMonitor(string functionId, BlobServiceClient blobServiceClient, ILoggerFactory loggerFactory) + { + _scaleMonitorDescriptor = new ScaleMonitorDescriptor(functionId, functionId); + _blobLogListener = new(() => BlobLogListener.CreateAsync( + blobServiceClient, + loggerFactory.CreateLogger(), + CancellationToken.None)); + _logger = loggerFactory.CreateLogger(); + } + + #pragma warning disable 0649 + // For tests, in PROD the value is always null + private BlobWithContainer _recentWrite; + #pragma warning restore 0649 + + public ScaleMonitorDescriptor Descriptor => _scaleMonitorDescriptor; + + public async Task GetMetricsAsync() + { + // if new blob were detected we want to GetScaleStatus return scale out vote at least once + if (Interlocked.Equals(_threadSafeWritesDetectedValue, 1)) + { + _logger.LogInformation($"New writes were detectd but GetScaleStatus was not called. Waiting GetScaleStatus to call."); + return new ScaleMetrics(); + } + + var blobLogListener = await _blobLogListener.Value.ConfigureAwait(false); + BlobWithContainer[] recentWrites = _recentWrite == null ? (await blobLogListener.GetRecentBlobWritesAsync(CancellationToken.None).ConfigureAwait(false)).ToArray() + : new BlobWithContainer[] { _recentWrite }; + if (recentWrites.Length > 0) + { + StringBuilder stringBuilder = new StringBuilder(); + foreach (var write in recentWrites) + { + stringBuilder.Append($"'{write.BlobClient.Name}', "); + if (stringBuilder.Length > 1000) + { + stringBuilder.Append("[truncated]"); + break; + } + } + _logger.LogInformation($"'{recentWrites.Length}' recent writes were detected for '{_scaleMonitorDescriptor.FunctionId}': {stringBuilder}"); + Interlocked.CompareExchange(ref _threadSafeWritesDetectedValue, 1, 0); + } + else + { + _logger.LogInformation($"No recent writes were detected for '{_scaleMonitorDescriptor.FunctionId}'"); + Interlocked.CompareExchange(ref _threadSafeWritesDetectedValue, 0, 1); + } + return new ScaleMetrics(); + } + + public ScaleStatus GetScaleStatus(ScaleStatusContext context) + { + return GetScaleStatusCore(context.WorkerCount); + } + + public ScaleStatus GetScaleStatus(ScaleStatusContext context) + { + return GetScaleStatusCore(context.WorkerCount); + } + + private ScaleStatus GetScaleStatusCore(int workerCount) + { + // if there is at least one worker we assume all the blobs are added to internal queue and we need to ScaleIn + if (workerCount > 0) + { + // Set to 0 if there is an active worker + Interlocked.CompareExchange(ref _threadSafeWritesDetectedValue, 0, 1); + } + + ScaleVote vote = ScaleVote.None; + if (workerCount == 0 && _threadSafeWritesDetectedValue == 1) + { + vote = ScaleVote.ScaleOut; + } + else if (workerCount > 0 && _threadSafeWritesDetectedValue == 0) + { + vote = ScaleVote.ScaleIn; + } + else if (workerCount == 0 && _threadSafeWritesDetectedValue == 0) + { + vote = ScaleVote.None; + } + _logger.LogInformation($"Current vote is '{vote}', active workers is '{workerCount}' for '{_scaleMonitorDescriptor.FunctionId}'"); + + return new ScaleStatus() + { + Vote = vote + }; + } + } + + internal class BlobMetadata + { + [JsonProperty] + public string Connection { get; set; } + } + } +} diff --git a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/BlobTriggerMessage.cs b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/BlobTriggerMessage.cs index b5838df7f448..6f18e8ce7714 100644 --- a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/BlobTriggerMessage.cs +++ b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/BlobTriggerMessage.cs @@ -28,7 +28,7 @@ public string Type // BlobType enum have different values in track 2 vs track 1, e.g. Block vs BlockBlob. // This internal property makes sure we serialize new type same way track 1 extension did. // This also makes sure we can read both formats since we already shipped few betas and don't want to disturb them. - [JsonProperty("BlobType")] + [JsonProperty(nameof(BlobType))] private string BlobTypeInternal { get { diff --git a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/PollLogsStrategy.cs b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/PollLogsStrategy.cs index b77be44d2014..7211c8837b35 100644 --- a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/PollLogsStrategy.cs +++ b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/PollLogsStrategy.cs @@ -78,7 +78,7 @@ public async Task RegisterAsync(BlobServiceClient blobServiceClient, BlobContain if (!_logListeners.ContainsKey(blobServiceClient)) { - BlobLogListener logListener = await BlobLogListener.CreateAsync(blobServiceClient, _exceptionHandler, _logger, cancellationToken).ConfigureAwait(false); + BlobLogListener logListener = await BlobLogListener.CreateAsync(blobServiceClient, _logger, cancellationToken).ConfigureAwait(false); _logListeners.Add(blobServiceClient, logListener); } } diff --git a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/StorageBlobsWebJobsBuilderExtensions.cs b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/StorageBlobsWebJobsBuilderExtensions.cs index 84d1b5834cfd..568c6cc2b138 100644 --- a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/StorageBlobsWebJobsBuilderExtensions.cs +++ b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/StorageBlobsWebJobsBuilderExtensions.cs @@ -11,6 +11,7 @@ using Microsoft.Azure.WebJobs.Extensions.Storage.Common; using Microsoft.Azure.WebJobs.Extensions.Storage.Common.Listeners; using Microsoft.Azure.WebJobs.Host; +using Microsoft.Azure.WebJobs.Host.Scale; using Microsoft.Extensions.Azure; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection.Extensions; @@ -63,5 +64,21 @@ public static IWebJobsBuilder AddAzureStorageBlobs(this IWebJobsBuilder builder, return builder; } + + /// + /// Adds the Storage Queues extension to the provided . + /// + /// + /// Trigger metadata. + /// + public static IWebJobsBuilder AddAzureStorageBlobsScaleForTrigger(this IWebJobsBuilder builder, TriggerMetadata triggerMetadata) + { + builder.Services.AddSingleton(serviceProvider => + { + return new BlobScalerMonitorProvider(serviceProvider, triggerMetadata); + }); + + return builder; + } } } diff --git a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Common/src/Shared/Queues/QueueTargetScaler.cs b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Common/src/Shared/Queues/QueueTargetScaler.cs index 9b30b2914d2b..6da40f64a5b6 100644 --- a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Common/src/Shared/Queues/QueueTargetScaler.cs +++ b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Common/src/Shared/Queues/QueueTargetScaler.cs @@ -64,7 +64,7 @@ internal TargetScalerResult GetScaleResultInternal(TargetScalerContext context, int targetWorkerCount = (int)Math.Ceiling(queueLength / (decimal)concurrency); - _logger.LogInformation($"'Target worker count for function '{_functionId}' is '{targetWorkerCount}' (QueueName='{_queueName}', QueueLength ='{queueLength}', Concurrency='{concurrency}')."); + _logger.LogInformation($"Target worker count for function '{_functionId}' is '{targetWorkerCount}' (QueueName='{_queueName}', QueueLength ='{queueLength}', Concurrency='{concurrency}')."); return new TargetScalerResult { TargetWorkerCount = targetWorkerCount diff --git a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Queues/api/Microsoft.Azure.WebJobs.Extensions.Storage.Queues.netstandard2.0.cs b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Queues/api/Microsoft.Azure.WebJobs.Extensions.Storage.Queues.netstandard2.0.cs index 6e772ce4cfa5..cbe1abcc3217 100644 --- a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Queues/api/Microsoft.Azure.WebJobs.Extensions.Storage.Queues.netstandard2.0.cs +++ b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Queues/api/Microsoft.Azure.WebJobs.Extensions.Storage.Queues.netstandard2.0.cs @@ -82,5 +82,6 @@ namespace Microsoft.Extensions.Hosting public static partial class StorageQueuesWebJobsBuilderExtensions { public static Microsoft.Azure.WebJobs.IWebJobsBuilder AddAzureStorageQueues(this Microsoft.Azure.WebJobs.IWebJobsBuilder builder, System.Action configureQueues = null) { throw null; } + public static Microsoft.Azure.WebJobs.IWebJobsBuilder AddAzureStorageQueuesScaleForTrigger(this Microsoft.Azure.WebJobs.IWebJobsBuilder builder, Microsoft.Azure.WebJobs.Host.Scale.TriggerMetadata triggerMetadata) { throw null; } } } diff --git a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Queues/src/Listeners/QueueScalerProvider.cs b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Queues/src/Listeners/QueueScalerProvider.cs new file mode 100644 index 000000000000..43fed4f1f619 --- /dev/null +++ b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Queues/src/Listeners/QueueScalerProvider.cs @@ -0,0 +1,93 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; +using Azure.Storage.Queues; +using Microsoft.Azure.WebJobs.Extensions.Storage.Common.Listeners; +using Microsoft.Azure.WebJobs.Host; +using Microsoft.Azure.WebJobs.Host.Queues; +using Microsoft.Azure.WebJobs.Host.Scale; +using Microsoft.Extensions.Azure; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using Newtonsoft.Json; + +namespace Microsoft.Azure.WebJobs.Extensions.Storage.Queues.Listeners +{ + internal class QueueScalerProvider : IScaleMonitorProvider, ITargetScalerProvider + { + private readonly TriggerMetadata _triggerMetadata; + private readonly IOptions _options; + private readonly ILoggerFactory _loggerFactory; + private readonly QueueMetadata _queueMetadata; + private readonly QueueClient _queueClient; + + public QueueScalerProvider(IServiceProvider serviceProvider, TriggerMetadata triggerMetadata) + { + AzureComponentFactory azureComponentFactory = null; + if ((triggerMetadata.Properties != null) && (triggerMetadata.Properties.TryGetValue(nameof(AzureComponentFactory), out object value))) + { + azureComponentFactory = value as AzureComponentFactory; + } + else + { + azureComponentFactory = serviceProvider.GetService(); + } + + _triggerMetadata = triggerMetadata; + _loggerFactory = serviceProvider.GetService(); + _queueMetadata = JsonConvert.DeserializeObject(_triggerMetadata.Metadata.ToString()); + _queueMetadata.ResolveProperties(serviceProvider.GetService()); + _options = serviceProvider.GetService>(); + + QueueServiceClientProvider queueServiceClientProvider = new QueueServiceClientProvider( + serviceProvider.GetService(), + azureComponentFactory, + serviceProvider.GetService(), + _options, + _loggerFactory, + _loggerFactory.CreateLogger(), + serviceProvider.GetService(), + new SharedQueueWatcher()); + + QueueServiceClient serviceClient = queueServiceClientProvider.Get(_queueMetadata.Connection, serviceProvider.GetService()); + _queueClient = serviceClient.GetQueueClient(_queueMetadata.QueueName); + } + + public IScaleMonitor GetMonitor() + { + return new QueueScaleMonitor( + _triggerMetadata.FunctionName, + _queueClient, + _loggerFactory); + } + + public ITargetScaler GetTargetScaler() + { + return new QueueTargetScaler( + _triggerMetadata.FunctionName, + _queueClient, + _options.Value, + _loggerFactory); + } + + internal class QueueMetadata + { + [JsonProperty] + public string Connection { get; set; } + + [JsonProperty] + public string QueueName { get; set; } + + public void ResolveProperties(INameResolver resolver) + { + if (resolver != null) + { + QueueName = resolver.ResolveWholeString(QueueName); + } + } + } + } +} diff --git a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Queues/src/Microsoft.Azure.WebJobs.Extensions.Storage.Queues.csproj b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Queues/src/Microsoft.Azure.WebJobs.Extensions.Storage.Queues.csproj index 7fe76379ce9f..c05a66126813 100644 --- a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Queues/src/Microsoft.Azure.WebJobs.Extensions.Storage.Queues.csproj +++ b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Queues/src/Microsoft.Azure.WebJobs.Extensions.Storage.Queues.csproj @@ -1,4 +1,4 @@ - + $(RequiredTargetFrameworks) diff --git a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Queues/src/StorageQueuesWebJobsBuilderExtensions.cs b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Queues/src/StorageQueuesWebJobsBuilderExtensions.cs index 50af1d956e67..57d85a61aae4 100644 --- a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Queues/src/StorageQueuesWebJobsBuilderExtensions.cs +++ b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Queues/src/StorageQueuesWebJobsBuilderExtensions.cs @@ -7,9 +7,11 @@ using Microsoft.Azure.WebJobs.Extensions.Storage.Common.Listeners; using Microsoft.Azure.WebJobs.Extensions.Storage.Queues; using Microsoft.Azure.WebJobs.Extensions.Storage.Queues.Config; +using Microsoft.Azure.WebJobs.Extensions.Storage.Queues.Listeners; using Microsoft.Azure.WebJobs.Extensions.Storage.Queues.Triggers; using Microsoft.Azure.WebJobs.Host; using Microsoft.Azure.WebJobs.Host.Queues; +using Microsoft.Azure.WebJobs.Host.Scale; using Microsoft.Extensions.Azure; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection.Extensions; @@ -63,5 +65,31 @@ public static IWebJobsBuilder AddAzureStorageQueues(this IWebJobsBuilder builder return builder; } + + /// + /// Adds the Storage Queues extension to the provided . + /// + /// + /// Trigger metadata. + /// + public static IWebJobsBuilder AddAzureStorageQueuesScaleForTrigger(this IWebJobsBuilder builder, TriggerMetadata triggerMetadata) + { + IServiceProvider serviceProvider = null; + Lazy scalerProvider = new Lazy(() => new QueueScalerProvider(serviceProvider, triggerMetadata)); + + builder.Services.AddSingleton(resolvedServiceProvider => + { + serviceProvider = serviceProvider ?? resolvedServiceProvider; + return scalerProvider.Value; + }); + + builder.Services.AddSingleton(resolvedServiceProvider => + { + serviceProvider = serviceProvider ?? resolvedServiceProvider; + return scalerProvider.Value; + }); + + return builder; + } } } diff --git a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Scenario.Tests/tests/BlobScaleHostEndToEndTests.cs b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Scenario.Tests/tests/BlobScaleHostEndToEndTests.cs new file mode 100644 index 000000000000..de5d287a244b --- /dev/null +++ b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Scenario.Tests/tests/BlobScaleHostEndToEndTests.cs @@ -0,0 +1,191 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Reflection; +using System.Text; +using System.Threading.Tasks; +using Azure.Core.TestFramework; +using Azure.Storage.Blobs; +using Azure.Storage.Blobs.Specialized; +using Microsoft.Azure.WebJobs.Extensions.Storage.Common.Tests; +using Microsoft.Azure.WebJobs.Extensions.Storage.ScenarioTests; +using Microsoft.Azure.WebJobs.Host.Scale; +using Microsoft.Extensions.Azure; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Newtonsoft.Json.Linq; +using NUnit.Framework; +using static Microsoft.Azure.WebJobs.Extensions.Storage.Scenario.Tests.QueueScaleHostEndToEndTests; + +namespace Microsoft.Azure.WebJobs.Extensions.Storage.Scenario.Tests +{ + public class BlobScaleHostEndToEndTests : LiveTestBase + { + private const string TestArtifactsPrefix = "e2etest"; + private static AzureStorageEndToEndTests.TestFixture _fixture; + private BlobServiceClient _blobServiceClient; + + private const string Function1Name = "Function1"; + + private string ContainerNameTemaplte1 = "container1%rnd%"; + + private const string BlobConnection1 = "BlobConnection1"; + + [OneTimeSetUp] + public void SetUp() + { + _fixture = new AzureStorageEndToEndTests.TestFixture(TestEnvironment); + _blobServiceClient = _fixture.BlobServiceClient; + } + + [Test] + [TestCase(true, Ignore = "true", IgnoreReason = "The test can take long time.")] + [TestCase(false)] + public async Task BlobScaleHostEndToEndTest(bool writeBlob) + { + RandomNameResolver randomNameResolver = new RandomNameResolver(); + string containerName = randomNameResolver.ResolveInString(ContainerNameTemaplte1); + BlobContainerClient blobContainerClient = _blobServiceClient.GetBlobContainerClient(containerName); + await blobContainerClient.CreateIfNotExistsAsync(); + + string hostJson = + @"{ + ""azureWebJobs"" : { + ""extensions"": { + ""blobs"": { + ""maxDegreeOfParallelism"" : 1, + } + } + } + }"; + + string triggers = $@"{{ +""triggers"": [ + {{ + ""name"": ""myQueueItem"", + ""type"": ""queueTrigger"", + ""direction"": ""in"", + ""path"": ""test"", + ""connection"": ""{BlobConnection1}"", + ""functionName"": ""{Function1Name}"" + }} + ]}}"; + + IHost host = new HostBuilder().ConfigureServices(services => services.AddAzureClientsCore()).Build(); + AzureComponentFactory defaultAzureComponentFactory = host.Services.GetService(); + + string hostId = "test-host"; + var loggerProvider = new TestLoggerProvider(); + + IHostBuilder hostBuilder = new HostBuilder(); + hostBuilder.ConfigureLogging(configure => + { + configure.SetMinimumLevel(LogLevel.Debug); + configure.AddProvider(loggerProvider); + }); + hostBuilder.ConfigureAppConfiguration((hostBuilderContext, config) => + { + // Adding host.json here + config.AddJsonStream(new MemoryStream(Encoding.UTF8.GetBytes(hostJson))); + + var settings = new Dictionary() + { + { BlobConnection1, TestEnvironment.PrimaryStorageAccountConnectionString } + }; + + // Adding app setting + config.AddInMemoryCollection(settings); + }) + .ConfigureServices(services => + { + services.AddAzureStorageScaleServices(); + + services.AddSingleton(); + }) + .ConfigureWebJobsScale((context, builder) => + { + builder.AddAzureStorageBlobs(); //it looks like we do not need this for blobs + builder.UseHostId(hostId); + + foreach (var jtoken in JObject.Parse(triggers)["triggers"]) + { + TriggerMetadata metadata = new TriggerMetadata(jtoken as JObject); + builder.AddAzureStorageBlobsScaleForTrigger(metadata); + } + }, + scaleOptions => + { + scaleOptions.IsTargetScalingEnabled = false; + scaleOptions.MetricsPurgeEnabled = false; + scaleOptions.ScaleMetricsMaxAge = TimeSpan.FromMinutes(4); + scaleOptions.IsRuntimeScalingEnabled = true; + scaleOptions.ScaleMetricsSampleInterval = TimeSpan.FromSeconds(1); + }); + + IHost scaleHost = hostBuilder.Build(); + await scaleHost.StartAsync(); + + int timeout = (60 * 1000); // 1 minute + if (writeBlob) + { + // Add new blobs + await blobContainerClient.UploadBlobAsync("test1.txt", new BinaryData("test1")); + await blobContainerClient.UploadBlobAsync("test2.txt", new BinaryData("test2")); + timeout = (60 * 1000) * 60; // 20 minutes + } + else + { + SetRecentWrite(scaleHost, blobContainerClient, true); + } + + // Wait until logs are populated and there is the "scale out" vote + await TestHelpers.Await(async () => + { + IScaleStatusProvider scaleStatusProvider = scaleHost.Services.GetService(); + + var scaleStatus = await scaleStatusProvider.GetScaleStatusAsync(new ScaleStatusContext() { WorkerCount = 0 }); + return scaleStatus.Vote == ScaleVote.ScaleOut && scaleStatus.FunctionScaleStatuses[Function1Name].Vote == ScaleVote.ScaleOut; + }, timeout); + + if (!writeBlob) + { + SetRecentWrite(scaleHost, blobContainerClient, false); + } + + // Emulate adding a worker, after adding the worker + await TestHelpers.Await(async () => + { + IScaleStatusProvider scaleStatusProvider = scaleHost.Services.GetService(); + + var scaleStatus = await scaleStatusProvider.GetScaleStatusAsync(new ScaleStatusContext() { WorkerCount = 1 }); + return scaleStatus.Vote == ScaleVote.ScaleIn && scaleStatus.FunctionScaleStatuses[Function1Name].Vote == ScaleVote.ScaleIn; + }, timeout); + + // Emulate removing the worker + await TestHelpers.Await(async () => + { + IScaleStatusProvider scaleStatusProvider = scaleHost.Services.GetService(); + + var scaleStatus = await scaleStatusProvider.GetScaleStatusAsync(new ScaleStatusContext() { WorkerCount = 0 }); + return scaleStatus.Vote == ScaleVote.None && scaleStatus.FunctionScaleStatuses[Function1Name].Vote == ScaleVote.None; + }, timeout); + } + + private void SetRecentWrite(IHost scaleHost, BlobContainerClient blobContainerClient, bool setValue) + { + IScaleMonitor zeroToOneScaleMonitor = scaleHost.Services.GetService>(); + var monitorProvider = scaleHost.Services.GetService(); + IScaleMonitor scaleMonitor = monitorProvider.GetMonitor(); + FieldInfo field = scaleMonitor.GetType().GetField("_recentWrite", BindingFlags.NonPublic | BindingFlags.Instance); + var ctor = field.FieldType.GetTypeInfo().GetConstructors(BindingFlags.Public | BindingFlags.Instance).First(); + var instance = setValue ? ctor.Invoke(new object[] { blobContainerClient, blobContainerClient.GetBlobBaseClient("test") }) : null; + field.SetValue(scaleMonitor, instance); + } + } +} diff --git a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Scenario.Tests/tests/Microsoft.Azure.WebJobs.Extensions.Storage.Scenario.Tests.csproj b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Scenario.Tests/tests/Microsoft.Azure.WebJobs.Extensions.Storage.Scenario.Tests.csproj index aa7b252fada5..4c9e516da28e 100644 --- a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Scenario.Tests/tests/Microsoft.Azure.WebJobs.Extensions.Storage.Scenario.Tests.csproj +++ b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Scenario.Tests/tests/Microsoft.Azure.WebJobs.Extensions.Storage.Scenario.Tests.csproj @@ -4,12 +4,10 @@ - - - - - + + + @@ -22,6 +20,7 @@ + diff --git a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Scenario.Tests/tests/QueueScaleHostEndToEndTests.cs b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Scenario.Tests/tests/QueueScaleHostEndToEndTests.cs new file mode 100644 index 000000000000..03c99843af42 --- /dev/null +++ b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Scenario.Tests/tests/QueueScaleHostEndToEndTests.cs @@ -0,0 +1,228 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Azure.Core.TestFramework; +using Azure.Storage.Queues; +using Microsoft.Azure.WebJobs.Extensions.Storage.Common.Tests; +using Microsoft.Azure.WebJobs.Extensions.Storage.ScenarioTests; +using Microsoft.Azure.WebJobs.Host.Scale; +using Microsoft.Azure.WebJobs.Host.TestCommon; +using Microsoft.Extensions.Azure; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Newtonsoft.Json.Linq; +using NUnit.Framework; +using NUnit.Framework.Internal; + +namespace Microsoft.Azure.WebJobs.Extensions.Storage.Scenario.Tests +{ + public class QueueScaleHostEndToEndTests : LiveTestBase + { + private const string TestArtifactsPrefix = "e2etest"; + private static AzureStorageEndToEndTests.TestFixture _fixture; + private QueueServiceClient _queueServiceClient; + + private const string Function1Name = "Function1"; + private const string Function2Name = "Function2"; + + private string QueueNameTemaplte1 = "queue1%rnd%"; + private string QueueNameTemaplte2 = "queue2%rnd%"; + + private const string QueueConnection1 = "QueueConnection1"; + private const string QueueConnection2 = "QueueConnection2"; + + [OneTimeSetUp] + public void SetUp() + { + _fixture = new AzureStorageEndToEndTests.TestFixture(TestEnvironment); + _queueServiceClient = _fixture.QueueServiceClient; + } + + [Test] + [TestCase(false)] + [TestCase(true)] + public async Task ScaleHostEndToEndTest(bool tbsEnabled) + { + RandomNameResolver randomNameResolver = new RandomNameResolver(); + string queueName1 = randomNameResolver.ResolveInString(QueueNameTemaplte1); + string queueName2 = randomNameResolver.ResolveInString(QueueNameTemaplte2); + QueueClient client1 = _queueServiceClient.GetQueueClient(queueName1); + await client1.CreateIfNotExistsAsync(); + QueueClient client2 = _queueServiceClient.GetQueueClient(queueName2); + await client2.CreateIfNotExistsAsync(); + + string hostJson = + @"{ + ""azureWebJobs"" : { + ""extensions"": { + ""queues"": { + ""batchSize"" : 1, + ""newBatchThreshold"": 0 + } + } + } + }"; + + // Function1Name uses connection string + // Function2Name uses AzureComponentFactory - simulating managed identity scenario in ScaleController + + string triggers = $@"{{ + ""triggers"": [ + {{ + ""name"": ""myQueueItem"", + ""type"": ""queueTrigger"", + ""direction"": ""in"", + ""queueName"": ""{queueName1}"", + ""connection"": ""{QueueConnection1}"", + ""functionName"": ""{Function1Name}"" + }}, + {{ + ""name"": ""myQueueItem"", + ""type"": ""queueTrigger"", + ""direction"": ""in"", + ""queueName"": ""{queueName2}"", + ""connection"": ""{QueueConnection2}"", + ""functionName"": ""{Function2Name}"" + }} + ]}}"; + + IHost host = new HostBuilder().ConfigureServices(services => services.AddAzureClientsCore()).Build(); + AzureComponentFactory defaultAzureComponentFactory = host.Services.GetService(); + + var container = new ConfigurationBuilder() + .AddInMemoryCollection(new Dictionary() { { QueueConnection1, TestEnvironment.PrimaryStorageAccountConnectionString } }) + .Build(); + //var credentials = defaultAzureComponentFactory.CreateTokenCredential(container.GetSection(QueueConnection1)); + TestComponentFactory factoryWrapper = new TestComponentFactory(defaultAzureComponentFactory, TestEnvironment.Credential); + + string hostId = "test-host"; + var loggerProvider = new TestLoggerProvider(); + + IHostBuilder hostBuilder = new HostBuilder(); + hostBuilder.ConfigureLogging(configure => + { + configure.SetMinimumLevel(LogLevel.Debug); + configure.AddProvider(loggerProvider); + }); + hostBuilder.ConfigureAppConfiguration((hostBuilderContext, config) => + { + // Adding host.json here + config.AddJsonStream(new MemoryStream(Encoding.UTF8.GetBytes(hostJson))); + + var settings = new Dictionary() + { + { QueueConnection1, TestEnvironment.PrimaryStorageAccountConnectionString }, + { $"{QueueConnection2}:queueServiceUri", $"https://{_fixture.QueueServiceClient.AccountName}.queue.core.windows.net" } + }; + + // Adding app setting + config.AddInMemoryCollection(settings); + }) + .ConfigureServices(services => + { + services.AddAzureStorageScaleServices(); + + services.AddSingleton(); + }) + .ConfigureWebJobsScale((context, builder) => + { + builder.AddAzureStorageQueues(); + builder.UseHostId(hostId); + + foreach (var jtoken in JObject.Parse(triggers)["triggers"]) + { + TriggerMetadata metadata = new TriggerMetadata(jtoken as JObject); + if (metadata.FunctionName == Function2Name) + { + metadata.Properties[nameof(AzureComponentFactory)] = factoryWrapper; + } + builder.AddAzureStorageQueuesScaleForTrigger(metadata); + } + }, + scaleOptions => + { + scaleOptions.IsTargetScalingEnabled = tbsEnabled; + scaleOptions.MetricsPurgeEnabled = false; + scaleOptions.ScaleMetricsMaxAge = TimeSpan.FromMinutes(4); + scaleOptions.IsRuntimeScalingEnabled = true; + scaleOptions.ScaleMetricsSampleInterval = TimeSpan.FromSeconds(1); + }); + + IHost scaleHost = hostBuilder.Build(); + await scaleHost.StartAsync(); + + // add some messages to the queue + await client1.SendMessageAsync("test"); + await client1.SendMessageAsync("test"); + await client2.SendMessageAsync("test"); + await client2.SendMessageAsync("test"); + await client2.SendMessageAsync("test"); + + try + { + await TestHelpers.Await(async () => + { + IScaleStatusProvider scaleStatusProvider = scaleHost.Services.GetService(); + + var scaleStatus = await scaleStatusProvider.GetScaleStatusAsync(new ScaleStatusContext()); + + bool scaledOut = false; + if (!tbsEnabled) + { + scaledOut = scaleStatus.Vote == ScaleVote.ScaleOut && scaleStatus.TargetWorkerCount == null + && scaleStatus.FunctionScaleStatuses[Function1Name].Vote == ScaleVote.ScaleOut + && scaleStatus.FunctionScaleStatuses[Function2Name].Vote == ScaleVote.ScaleOut; + + if (scaledOut) + { + var logMessages = loggerProvider.GetAllLogMessages().Select(p => p.FormattedMessage).ToArray(); + Assert.Contains("2 scale monitors to sample", logMessages); + } + } + else + { + scaledOut = scaleStatus.Vote == ScaleVote.ScaleOut && scaleStatus.TargetWorkerCount == 3 + && scaleStatus.FunctionTargetScalerResults[Function1Name].TargetWorkerCount == 2 + && scaleStatus.FunctionTargetScalerResults[Function2Name].TargetWorkerCount == 3; + + if (scaledOut) + { + var logMessages = loggerProvider.GetAllLogMessages().Select(p => p.FormattedMessage).ToArray(); + Assert.Contains("2 target scalers to sample", logMessages); + } + } + + if (scaledOut) + { + var logMessages = loggerProvider.GetAllLogMessages().Select(p => p.FormattedMessage).ToArray(); + Assert.IsNotEmpty(logMessages.Where(x => x.StartsWith("Runtime scale monitoring is enabled."))); + if (!tbsEnabled) + { + Assert.Contains("Scaling out based on votes", logMessages); + } + } + + return scaledOut; + }, pollingInterval: 2000, timeout: 120000, throwWhenDebugging: true); + } + catch (Exception) + { + // Write scale logs to the output: + var logMessages = loggerProvider.GetAllLogMessages().Where(x => x.Category.Contains("Scale")).Select(p => p.FormattedMessage).ToArray(); + foreach (var logMessage in logMessages) + { + TestContext.WriteLine(logMessage); + } + throw; + } + } + } +}