diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/api/Microsoft.Azure.WebJobs.Extensions.EventHubs.netstandard2.0.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/api/Microsoft.Azure.WebJobs.Extensions.EventHubs.netstandard2.0.cs index 6048233422a5..8e303b16edd9 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/api/Microsoft.Azure.WebJobs.Extensions.EventHubs.netstandard2.0.cs +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/api/Microsoft.Azure.WebJobs.Extensions.EventHubs.netstandard2.0.cs @@ -68,5 +68,6 @@ public static partial class EventHubWebJobsBuilderExtensions { public static Microsoft.Azure.WebJobs.IWebJobsBuilder AddEventHubs(this Microsoft.Azure.WebJobs.IWebJobsBuilder builder) { throw null; } public static Microsoft.Azure.WebJobs.IWebJobsBuilder AddEventHubs(this Microsoft.Azure.WebJobs.IWebJobsBuilder builder, System.Action configure) { throw null; } + public static Microsoft.Azure.WebJobs.IWebJobsBuilder AddEventHubsScaleForTrigger(this Microsoft.Azure.WebJobs.IWebJobsBuilder builder, Microsoft.Azure.WebJobs.Host.Scale.TriggerMetadata triggerMetadata) { throw null; } } } diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubWebJobsBuilderExtensions.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubWebJobsBuilderExtensions.cs index 1932fcce7d01..54dfcb739259 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubWebJobsBuilderExtensions.cs +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubWebJobsBuilderExtensions.cs @@ -2,12 +2,13 @@ // Licensed under the MIT License. See License.txt in the project root for license information. using System; -using System.Globalization; using System.Net; using Azure.Messaging.EventHubs.Consumer; using Microsoft.Azure.WebJobs; using Microsoft.Azure.WebJobs.EventHubs; +using Microsoft.Azure.WebJobs.EventHubs.Listeners; using Microsoft.Azure.WebJobs.EventHubs.Processor; +using Microsoft.Azure.WebJobs.Host.Scale; using Microsoft.Extensions.Azure; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; @@ -91,6 +92,26 @@ public static IWebJobsBuilder AddEventHubs(this IWebJobsBuilder builder, Action< return builder; } + public static IWebJobsBuilder AddEventHubsScaleForTrigger(this IWebJobsBuilder builder, TriggerMetadata triggerMetadata) + { + IServiceProvider serviceProvider = null; + Lazy scalerProvider = new Lazy(() => new EventHubsScalerProvider(serviceProvider, triggerMetadata)); + + builder.Services.AddSingleton(resolvedServiceProvider => + { + serviceProvider = serviceProvider ?? resolvedServiceProvider; + return scalerProvider.Value; + }); + + builder.Services.AddSingleton(resolvedServiceProvider => + { + serviceProvider = serviceProvider ?? resolvedServiceProvider; + return scalerProvider.Value; + }); + + return builder; + } + internal static void ConfigureOptions(EventHubOptions options) { OffsetType? type = options?.InitialOffsetOptions?.Type; diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubsScalerProvider.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubsScalerProvider.cs new file mode 100644 index 000000000000..517d2b81659a --- /dev/null +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubsScalerProvider.cs @@ -0,0 +1,104 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using Azure.Messaging.EventHubs.Primitives; +using Azure.Storage.Blobs; +using Microsoft.Azure.WebJobs.EventHubs.Processor; +using Microsoft.Azure.WebJobs.Extensions.EventHubs.Listeners; +using Microsoft.Azure.WebJobs.Host; +using Microsoft.Azure.WebJobs.Host.Scale; +using Microsoft.Extensions.Azure; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using Newtonsoft.Json; + +namespace Microsoft.Azure.WebJobs.EventHubs.Listeners +{ + internal class EventHubsScalerProvider : IScaleMonitorProvider, ITargetScalerProvider + { + private readonly IScaleMonitor _scaleMonitor; + private readonly ITargetScaler _targetScaler; + + public EventHubsScalerProvider(IServiceProvider serviceProvider, TriggerMetadata triggerMetadata) + { + AzureComponentFactory azureComponentFactory; + if ((triggerMetadata.Properties != null) && (triggerMetadata.Properties.TryGetValue(nameof(AzureComponentFactory), out object value))) + { + azureComponentFactory = value as AzureComponentFactory; + } + else + { + azureComponentFactory = serviceProvider.GetService(); + } + + var configuration = serviceProvider.GetService(); + var hostComponentFactory = serviceProvider.GetService(); + var logForwarder = serviceProvider.GetService(); + var options = serviceProvider.GetService>(); + var loggerFactory = serviceProvider.GetService(); + var checkpointClientProvider = new CheckpointClientProvider(configuration, azureComponentFactory, logForwarder, loggerFactory.CreateLogger()); + var nameResolver = serviceProvider.GetService(); + var eventHubMetadata = JsonConvert.DeserializeObject(triggerMetadata.Metadata.ToString()); + var factory = new EventHubClientFactory(configuration, hostComponentFactory, options, nameResolver, logForwarder, checkpointClientProvider); + eventHubMetadata.ResolveProperties(serviceProvider.GetService()); + var eventHubConsumerClient = factory.GetEventHubConsumerClient(eventHubMetadata.EventHubName, eventHubMetadata.Connection, eventHubMetadata.ConsumerGroup); + var checkpointStore = new BlobCheckpointStoreInternal( + factory.GetCheckpointStoreClient(), + triggerMetadata.FunctionName, + loggerFactory.CreateLogger()); + var eventHubMerticsProvider = new EventHubMetricsProvider( + triggerMetadata.FunctionName, + eventHubConsumerClient, + checkpointStore, + loggerFactory.CreateLogger() + ); + + _scaleMonitor = new EventHubsScaleMonitor( + triggerMetadata.FunctionName, + eventHubConsumerClient, + checkpointStore, + loggerFactory.CreateLogger()); + + _targetScaler = new EventHubsTargetScaler( + triggerMetadata.FunctionName, + eventHubConsumerClient, + options.Value, + eventHubMerticsProvider, + loggerFactory.CreateLogger()); + } + + public IScaleMonitor GetMonitor() + { + return _scaleMonitor; + } + + public ITargetScaler GetTargetScaler() + { + return _targetScaler; + } + + private class EventHubMetadata + { + [JsonProperty] + public string EventHubName { get; set; } + + [JsonProperty] + public string ConsumerGroup { get; set; } + + [JsonProperty] + public string Connection { get; set; } + + public void ResolveProperties(INameResolver resolver) + { + if (resolver != null) + { + EventHubName = resolver.ResolveWholeString(EventHubName); + ConsumerGroup = resolver.ResolveWholeString(ConsumerGroup); + } + } + } + } +} diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubsTargetScaler.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubsTargetScaler.cs index 008a09f5c46c..b49755406cb9 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubsTargetScaler.cs +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubsTargetScaler.cs @@ -95,7 +95,7 @@ internal TargetScalerResult GetScaleResultInternal(TargetScalerContext context, _logger.LogInformation($"Desired target worker count of '{desiredWorkerCount}' is not in list of valid sorted workers: '{string.Join(",", sortedValidWorkerCounts)}'. Using next largest valid worker as target worker count."); } - _logger.LogInformation($"'Target worker count for function '{_functionId}' is '{validatedTargetWorkerCount}' (EventHubName='{_client.EventHubName}', EventCount ='{eventCount}', Concurrency='{desiredConcurrency}', PartitionCount='{partitionCount}')."); + _logger.LogInformation($"Target worker count for function '{_functionId}' is '{validatedTargetWorkerCount}' (EventHubName='{_client.EventHubName}', EventCount ='{eventCount}', Concurrency='{desiredConcurrency}', PartitionCount='{partitionCount}')."); return new TargetScalerResult { diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/Microsoft.Azure.WebJobs.Extensions.EventHubs.Tests.csproj b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/Microsoft.Azure.WebJobs.Extensions.EventHubs.Tests.csproj index cca0c806f8da..f2733d8c5b18 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/Microsoft.Azure.WebJobs.Extensions.EventHubs.Tests.csproj +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/Microsoft.Azure.WebJobs.Extensions.EventHubs.Tests.csproj @@ -18,6 +18,7 @@ + diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/ScaleHostEndToEndTests.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/ScaleHostEndToEndTests.cs new file mode 100644 index 000000000000..4cc6690d05bd --- /dev/null +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/ScaleHostEndToEndTests.cs @@ -0,0 +1,217 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Azure.Core.TestFramework; +using Azure.Messaging.EventHubs; +using Azure.Messaging.EventHubs.Primitives; +using Azure.Messaging.EventHubs.Producer; +using Azure.Messaging.EventHubs.Tests; +using Azure.Storage.Blobs; +using Microsoft.Azure.WebJobs.EventHubs; +using Microsoft.Azure.WebJobs.Host.EndToEndTests; +using Microsoft.Azure.WebJobs.Host.Scale; +using Microsoft.Azure.WebJobs.Host.TestCommon; +using Microsoft.Extensions.Azure; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Newtonsoft.Json.Linq; +using NUnit.Framework; + +namespace Microsoft.Azure.WebJobs.Extensions.ServiceBus.Tests +{ + [NonParallelizable] + [LiveOnly(true)] + public class ScaleHostEndToEndTests : WebJobsEventHubTestBase + { + private const string Function1Name = "Function1"; + private const string Function2Name = "Function2"; + + private const string EventHubConnection1 = "EventHubConnection1"; + private const string EventHubConnection2 = "EventHubConnection2"; + + /// + /// Performs the tasks needed to initialize each test. This + /// method runs once for the each test prior to running it. + /// + /// + [SetUp] + public new async Task BaseSetUp() + { + _eventHubScope = await EventHubScope.CreateAsync(2, new List() { "ConsumerGroup" }); + } + + public ScaleHostEndToEndTests() : base() + { + } + + [Test] + [TestCase(false)] + [TestCase(true)] + public async Task ScaleHostEndToEndTest(bool tbsEnabled) + { + string hostJson = + @"{ + ""azureWebJobs"" : { + ""extensions"": { + ""eventHubs"": { + ""targetUnprocessedEventThreshold"": 1 + } + } + } + }"; + + // Function1Name uses connection string + // Function2Name uses AzureComponentFactory - simulating managed identity scenario in ScaleController + string triggers = $@"{{ +""triggers"": [ + {{ + ""name"": ""myQueueItem"", + ""type"": ""eventHubsTrigger"", + ""direction"": ""in"", + ""eventHubName"": ""{_eventHubScope.EventHubName}"", + ""consumerGroup"": ""{_eventHubScope.ConsumerGroups[0]}"", + ""connection"": ""{EventHubConnection1}"", + ""functionName"": ""{Function1Name}"" + }}, + {{ + ""name"": ""myQueueItem"", + ""type"": ""serviceBusTrigger"", + ""direction"": ""in"", + ""eventHubName"": ""{_eventHubScope.EventHubName}"", + ""consumerGroup"": ""{_eventHubScope.ConsumerGroups[1]}"", + ""connection"": ""{EventHubConnection2}"", + ""functionName"": ""{Function2Name}"" + }} + ]}}"; + + IHost host = new HostBuilder().ConfigureServices(services => services.AddAzureClientsCore()).Build(); + AzureComponentFactory defaultAzureComponentFactory = host.Services.GetService(); + TestComponentFactory factoryWrapper = new TestComponentFactory(defaultAzureComponentFactory, EventHubsTestEnvironment.Instance.Credential); + + string hostId = "test-host"; + var loggerProvider = new TestLoggerProvider(); + + IHostBuilder hostBuilder = new HostBuilder(); + hostBuilder.ConfigureLogging(configure => + { + configure.SetMinimumLevel(LogLevel.Debug); + configure.AddProvider(loggerProvider); + }); + hostBuilder.ConfigureAppConfiguration((hostBuilderContext, config) => + { + // Adding host.json here + config.AddJsonStream(new MemoryStream(Encoding.UTF8.GetBytes(hostJson))); + + var settings = new Dictionary() + { + { $"{EventHubConnection1}", EventHubsTestEnvironment.Instance.EventHubsConnectionString }, + { $"{EventHubConnection2}:fullyQualifiedNamespace", $"{EventHubsTestEnvironment.Instance.EventHubsNamespace}.servicebus.windows.net" }, + { "AzureWebJobsStorage", StorageTestEnvironment.Instance.StorageConnectionString } + }; + + // Adding app setting + config.AddInMemoryCollection(settings); + }) + .ConfigureServices(services => + { + services.AddAzureStorageScaleServices(); + + FakeNameResolver nameResolver = new FakeNameResolver(); + nameResolver.Add(EventHubConnection1, EventHubsTestEnvironment.Instance.EventHubsConnectionString); + nameResolver.Add(EventHubConnection2, $"{EventHubsTestEnvironment.Instance.EventHubsNamespace}.servicebus.windows.net"); + services.AddSingleton(nameResolver); + }) + .ConfigureWebJobsScale((context, builder) => + { + builder.AddEventHubs(); + builder.UseHostId(hostId); + + foreach (var jtoken in JObject.Parse(triggers)["triggers"]) + { + TriggerMetadata metadata = new TriggerMetadata(jtoken as JObject); + if (metadata.FunctionName == Function2Name) + { + metadata.Properties[nameof(AzureComponentFactory)] = factoryWrapper; + } + builder.AddEventHubsScaleForTrigger(metadata); + } + }, + scaleOptions => + { + scaleOptions.IsTargetScalingEnabled = tbsEnabled; + scaleOptions.MetricsPurgeEnabled = false; + scaleOptions.ScaleMetricsMaxAge = TimeSpan.FromMinutes(4); + scaleOptions.IsRuntimeScalingEnabled = true; + scaleOptions.ScaleMetricsSampleInterval = TimeSpan.FromSeconds(1); + }); + + IHost scaleHost = hostBuilder.Build(); + await scaleHost.StartAsync(); + + await using var producer = new EventHubProducerClient(EventHubsTestEnvironment.Instance.EventHubsConnectionString, _eventHubScope.EventHubName); + await producer.SendAsync(new EventData[] { new EventData(new BinaryData("data")) }); + await producer.SendAsync(new EventData[] { new EventData(new BinaryData("data")) }); + + var opstions = scaleHost.GetOptions(); + var blobContainerClient = new BlobContainerClient(StorageTestEnvironment.Instance.StorageConnectionString, opstions.CheckpointContainer); + var blobCheckpointStoreInternal1 = new BlobCheckpointStoreInternal(blobContainerClient, Function1Name, loggerProvider.CreateLogger("Test")); + await blobCheckpointStoreInternal1.CreateIfNotExistsAsync(CancellationToken.None); + var blobCheckpointStoreInternal2 = new BlobCheckpointStoreInternal(blobContainerClient, Function2Name, loggerProvider.CreateLogger("Test")); + await blobCheckpointStoreInternal2.CreateIfNotExistsAsync(CancellationToken.None); + + await TestHelpers.Await(async () => + { + IScaleStatusProvider scaleStatusProvider = scaleHost.Services.GetService(); + + var scaleStatus = await scaleStatusProvider.GetScaleStatusAsync(new ScaleStatusContext()); + + bool scaledOut = false; + if (!tbsEnabled) + { + scaledOut = scaleStatus.Vote == ScaleVote.ScaleOut && scaleStatus.TargetWorkerCount == null + && scaleStatus.FunctionScaleStatuses[Function1Name].Vote == ScaleVote.ScaleOut + && scaleStatus.FunctionScaleStatuses[Function2Name].Vote == ScaleVote.ScaleOut; + + if (scaledOut) + { + var logMessages = loggerProvider.GetAllLogMessages().Select(p => p.FormattedMessage).ToArray(); + Assert.Contains("2 scale monitors to sample", logMessages); + } + } + else + { + scaledOut = scaleStatus.Vote == ScaleVote.ScaleOut && scaleStatus.TargetWorkerCount == 2 + && scaleStatus.FunctionTargetScalerResults[Function1Name].TargetWorkerCount == 2 + && scaleStatus.FunctionTargetScalerResults[Function2Name].TargetWorkerCount == 2; + + if (scaledOut) + { + var logMessages = loggerProvider.GetAllLogMessages().Select(p => p.FormattedMessage).ToArray(); + Assert.Contains("2 target scalers to sample", logMessages); + } + } + + if (scaledOut) + { + var logMessages = loggerProvider.GetAllLogMessages().Select(p => p.FormattedMessage).ToArray(); + Assert.IsNotEmpty(logMessages.Where(x => x.StartsWith("Runtime scale monitoring is enabled."))); + if (!tbsEnabled) + { + Assert.Contains("Scaling out based on votes", logMessages); + } + } + + return scaledOut; + }, pollingInterval: 2000, timeout: 180000, throwWhenDebugging: true); + } + } +} \ No newline at end of file