diff --git a/src/Microsoft.Azure.WebJobs.Host/Executors/JobHostContextFactory.cs b/src/Microsoft.Azure.WebJobs.Host/Executors/JobHostContextFactory.cs index 7af8b5084..3b1686078 100644 --- a/src/Microsoft.Azure.WebJobs.Host/Executors/JobHostContextFactory.cs +++ b/src/Microsoft.Azure.WebJobs.Host/Executors/JobHostContextFactory.cs @@ -51,6 +51,7 @@ internal class JobHostContextFactory : IJobHostContextFactory private readonly IScaleMonitorManager _monitorManager; private readonly IDrainModeManager _drainModeManager; private readonly IApplicationLifetime _applicationLifetime; + private readonly ITargetScalerManager _targetScalerManager; public JobHostContextFactory( IDashboardLoggingSetup dashboardLoggingSetup, @@ -73,7 +74,8 @@ public JobHostContextFactory( IAsyncCollector eventCollector, IScaleMonitorManager monitorManager, IDrainModeManager drainModeManager, - IApplicationLifetime applicationLifetime) + IApplicationLifetime applicationLifetime, + ITargetScalerManager targetScalerManager) { _dashboardLoggingSetup = dashboardLoggingSetup; _functionExecutor = functionExecutor; @@ -96,6 +98,7 @@ public JobHostContextFactory( _monitorManager = monitorManager; _drainModeManager = drainModeManager; _applicationLifetime = applicationLifetime; + _targetScalerManager = targetScalerManager; } public async Task Create(JobHost host, CancellationToken shutdownToken, CancellationToken cancellationToken) @@ -124,7 +127,8 @@ public async Task Create(JobHost host, CancellationToken shutdow // they are started). host.OnHostInitialized(); }; - IListenerFactory functionsListenerFactory = new HostListenerFactory(functions.ReadAll(), _singletonManager, _activator, _nameResolver, _loggerFactory, _monitorManager, listenersCreatedCallback, _jobHostOptions.Value.AllowPartialHostStartup, _drainModeManager); + IListenerFactory functionsListenerFactory = new HostListenerFactory(functions.ReadAll(), _singletonManager, _activator, _nameResolver, _loggerFactory, + _monitorManager, _targetScalerManager, listenersCreatedCallback, _jobHostOptions.Value.AllowPartialHostStartup, _drainModeManager); string hostId = await _hostIdProvider.GetHostIdAsync(cancellationToken); bool dashboardLoggingEnabled = _dashboardLoggingSetup.Setup(functions, functionsListenerFactory, out IFunctionExecutor hostCallExecutor, diff --git a/src/Microsoft.Azure.WebJobs.Host/Hosting/WebJobsServiceCollectionExtensions.cs b/src/Microsoft.Azure.WebJobs.Host/Hosting/WebJobsServiceCollectionExtensions.cs index 48eb866aa..68624900e 100644 --- a/src/Microsoft.Azure.WebJobs.Host/Hosting/WebJobsServiceCollectionExtensions.cs +++ b/src/Microsoft.Azure.WebJobs.Host/Hosting/WebJobsServiceCollectionExtensions.cs @@ -88,6 +88,7 @@ public static IWebJobsBuilder AddWebJobs(this IServiceCollection services, Actio services.TryAddSingleton(); services.TryAddSingleton(); services.TryAddSingleton(); + services.TryAddSingleton(); services.AddSingleton(); services.AddSingleton(); diff --git a/src/Microsoft.Azure.WebJobs.Host/Listeners/HostListenerFactory.cs b/src/Microsoft.Azure.WebJobs.Host/Listeners/HostListenerFactory.cs index b3ec20a11..ce5fe838f 100644 --- a/src/Microsoft.Azure.WebJobs.Host/Listeners/HostListenerFactory.cs +++ b/src/Microsoft.Azure.WebJobs.Host/Listeners/HostListenerFactory.cs @@ -14,6 +14,7 @@ using Microsoft.Azure.WebJobs.Logging; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; namespace Microsoft.Azure.WebJobs.Host.Listeners { @@ -30,11 +31,11 @@ internal class HostListenerFactory : IListenerFactory private readonly bool _allowPartialHostStartup; private readonly Action _listenersCreatedCallback; private readonly IScaleMonitorManager _monitorManager; + private readonly ITargetScalerManager _targetScalerManager; private readonly IDrainModeManager _drainModeManager; - public HostListenerFactory(IEnumerable functionDefinitions, SingletonManager singletonManager, IJobActivator activator, - INameResolver nameResolver, ILoggerFactory loggerFactory, IScaleMonitorManager monitorManager, Action listenersCreatedCallback, bool allowPartialHostStartup = false, IDrainModeManager drainModeManager = null) + INameResolver nameResolver, ILoggerFactory loggerFactory, IScaleMonitorManager monitorManager, ITargetScalerManager targetScalerManager, Action listenersCreatedCallback, bool allowPartialHostStartup = false, IDrainModeManager drainModeManager = null) { _functionDefinitions = functionDefinitions; _singletonManager = singletonManager; @@ -44,6 +45,7 @@ public HostListenerFactory(IEnumerable functionDefinitions, _logger = _loggerFactory?.CreateLogger(LogCategories.Startup); _allowPartialHostStartup = allowPartialHostStartup; _monitorManager = monitorManager; + _targetScalerManager = targetScalerManager; _listenersCreatedCallback = listenersCreatedCallback; _drainModeManager = drainModeManager; } @@ -71,7 +73,8 @@ public async Task CreateAsync(CancellationToken cancellationToken) IListener listener = await listenerFactory.CreateAsync(cancellationToken); RegisterScaleMonitor(listener, _monitorManager); - + RegisterTargetScaler(listener, _targetScalerManager); + // if the listener is a Singleton, wrap it with our SingletonListener SingletonAttribute singletonAttribute = SingletonManager.GetListenerSingletonOrNull(listener.GetType(), functionDefinition.Descriptor); if (singletonAttribute != null) @@ -123,6 +126,27 @@ internal static void RegisterScaleMonitor(IListener listener, IScaleMonitorManag } } + internal static void RegisterTargetScaler(IListener listener, ITargetScalerManager targetScalerManager) + { + if (listener is ITargetScaler targetScaler) + { + targetScalerManager.Register(targetScaler); + } + else if (listener is ITargetScalerProvider targetScalerProvider) + { + var scaler = ((ITargetScalerProvider)listener).GetTargetScaler(); + targetScalerManager.Register(scaler); + } + else if (listener is IEnumerable) + { + // for composite listeners, we need to check all the inner listeners + foreach (var innerListener in ((IEnumerable)listener)) + { + RegisterTargetScaler(innerListener, targetScalerManager); + } + } + } + internal static bool IsDisabled(MethodInfo method, INameResolver nameResolver, IJobActivator activator, IConfiguration configuration) { // First try to resolve disabled state by setting diff --git a/src/Microsoft.Azure.WebJobs.Host/Scale/IScaleMonitorManager.cs b/src/Microsoft.Azure.WebJobs.Host/Scale/IScaleMonitorManager.cs index e1269d2fe..a78517198 100644 --- a/src/Microsoft.Azure.WebJobs.Host/Scale/IScaleMonitorManager.cs +++ b/src/Microsoft.Azure.WebJobs.Host/Scale/IScaleMonitorManager.cs @@ -24,7 +24,7 @@ public interface IScaleMonitorManager /// Should only be called after the host has been started and all /// instances are registered. /// - /// The collection of monitor intances. + /// The collection of monitor instances. IEnumerable GetMonitors(); } } diff --git a/src/Microsoft.Azure.WebJobs.Host/Scale/ITargetScaler.cs b/src/Microsoft.Azure.WebJobs.Host/Scale/ITargetScaler.cs new file mode 100644 index 000000000..bcb4bf69c --- /dev/null +++ b/src/Microsoft.Azure.WebJobs.Host/Scale/ITargetScaler.cs @@ -0,0 +1,27 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System.Threading.Tasks; + +namespace Microsoft.Azure.WebJobs.Host.Scale +{ + /// + /// Interface defining an Azure Functions scaler that makes scale decisions based on current + /// event source metrics and function concurrency. + /// + public interface ITargetScaler + { + /// + /// Returns the for this target scaler. + /// + TargetScalerDescriptor TargetScalerDescriptor { get; } + + /// + /// Return the current scale result based on the specified context. + /// + /// The to use to determine + /// the scale result. + /// The scale result. + Task GetScaleResultAsync(TargetScalerContext context); + } +} diff --git a/src/Microsoft.Azure.WebJobs.Host/Scale/ITargetScalerManager.cs b/src/Microsoft.Azure.WebJobs.Host/Scale/ITargetScalerManager.cs new file mode 100644 index 000000000..076801afb --- /dev/null +++ b/src/Microsoft.Azure.WebJobs.Host/Scale/ITargetScalerManager.cs @@ -0,0 +1,30 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System.Collections.Generic; + +namespace Microsoft.Azure.WebJobs.Host.Scale +{ + /// + /// Manager for registering and accessing instances for + /// a instance. + /// + public interface ITargetScalerManager + { + /// + /// Register an instance. + /// + /// The target scaler instance to register. + void Register(ITargetScaler scaler); + + /// + /// Get all registered target scaler instances. + /// + /// + /// Should only be called after the host has been started and all + /// instances are registered. + /// + /// The collection of target scaler instances. + IEnumerable GetTargetScalers(); + } +} diff --git a/src/Microsoft.Azure.WebJobs.Host/Scale/ITargetScalerProvider.cs b/src/Microsoft.Azure.WebJobs.Host/Scale/ITargetScalerProvider.cs new file mode 100644 index 000000000..a08ed0355 --- /dev/null +++ b/src/Microsoft.Azure.WebJobs.Host/Scale/ITargetScalerProvider.cs @@ -0,0 +1,21 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +namespace Microsoft.Azure.WebJobs.Host.Scale +{ + /// + /// Provider interface for returning an instance. + /// + /// + /// Listeners can implement directly, but in some + /// cases the decoupling afforded by this interface is needed. + /// + public interface ITargetScalerProvider + { + /// + /// Gets the instance. + /// + /// The instance. + ITargetScaler GetTargetScaler(); + } +} diff --git a/src/Microsoft.Azure.WebJobs.Host/Scale/TargetScalerContext.cs b/src/Microsoft.Azure.WebJobs.Host/Scale/TargetScalerContext.cs new file mode 100644 index 000000000..478fd0732 --- /dev/null +++ b/src/Microsoft.Azure.WebJobs.Host/Scale/TargetScalerContext.cs @@ -0,0 +1,20 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +namespace Microsoft.Azure.WebJobs.Host.Scale +{ + /// + /// Context used by to decide + /// scale result. + /// + public class TargetScalerContext + { + /// + /// The current concurrency for the target function. + /// + /// + /// When not specified, the scaler will determine the concurrency based on configuration. + /// + public int? InstanceConcurrency { get; set; } + } +} diff --git a/src/Microsoft.Azure.WebJobs.Host/Scale/TargetScalerDescriptor.cs b/src/Microsoft.Azure.WebJobs.Host/Scale/TargetScalerDescriptor.cs new file mode 100644 index 000000000..4ae9aebb5 --- /dev/null +++ b/src/Microsoft.Azure.WebJobs.Host/Scale/TargetScalerDescriptor.cs @@ -0,0 +1,21 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +namespace Microsoft.Azure.WebJobs.Host.Scale +{ + /// + /// Metadata descriptor for an . + /// + public class TargetScalerDescriptor + { + public TargetScalerDescriptor(string functionId) + { + FunctionId = functionId; + } + + /// + /// Gets the ID of the function associated with this scaler. + /// + public string FunctionId { get; } + } +} diff --git a/src/Microsoft.Azure.WebJobs.Host/Scale/TargetScalerManager.cs b/src/Microsoft.Azure.WebJobs.Host/Scale/TargetScalerManager.cs new file mode 100644 index 000000000..35ea88062 --- /dev/null +++ b/src/Microsoft.Azure.WebJobs.Host/Scale/TargetScalerManager.cs @@ -0,0 +1,44 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System.Collections.Generic; + +namespace Microsoft.Azure.WebJobs.Host.Scale +{ + internal class TargetScalerManager : ITargetScalerManager + { + private readonly List _targetScalers = new List(); + private object _syncRoot = new object(); + + public TargetScalerManager() + { + } + + public TargetScalerManager(IEnumerable targetScalers) + { + // add any initial target scalers coming from DI + // additional monitors can be added at runtime + // via Register + _targetScalers.AddRange(targetScalers); + } + + public void Register(ITargetScaler targetScaler) + { + lock (_syncRoot) + { + if (!_targetScalers.Contains(targetScaler)) + { + _targetScalers.Add(targetScaler); + } + } + } + + public IEnumerable GetTargetScalers() + { + lock (_syncRoot) + { + return _targetScalers.AsReadOnly(); + } + } + } +} diff --git a/src/Microsoft.Azure.WebJobs.Host/Scale/TargetScalerResult.cs b/src/Microsoft.Azure.WebJobs.Host/Scale/TargetScalerResult.cs new file mode 100644 index 000000000..2427a22d2 --- /dev/null +++ b/src/Microsoft.Azure.WebJobs.Host/Scale/TargetScalerResult.cs @@ -0,0 +1,16 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +namespace Microsoft.Azure.WebJobs.Host.Scale +{ + /// + /// Represents the scale result of a target scaler. + /// + public class TargetScalerResult + { + /// + /// Gets or sets the target worker count. + /// + public int TargetWorkerCount { get; set; } + } +} diff --git a/test/Microsoft.Azure.WebJobs.Host.FunctionalTests/HostListenerFactoryTests.cs b/test/Microsoft.Azure.WebJobs.Host.FunctionalTests/HostListenerFactoryTests.cs index 1f08d3b5b..7a467c5a9 100644 --- a/test/Microsoft.Azure.WebJobs.Host.FunctionalTests/HostListenerFactoryTests.cs +++ b/test/Microsoft.Azure.WebJobs.Host.FunctionalTests/HostListenerFactoryTests.cs @@ -16,6 +16,7 @@ using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; using Moq; using Xunit; @@ -58,8 +59,9 @@ public async Task CreateAsync_RegistersScaleMonitors() functions.Add(definition); var monitorManager = new ScaleMonitorManager(); + var targetScaleManager = new TargetScalerManager(); var drainModeManagerMock = new Mock(); - HostListenerFactory factory = new HostListenerFactory(functions, singletonManager, _jobActivator, null, loggerFactory, monitorManager, () => { }, false, drainModeManagerMock.Object); + HostListenerFactory factory = new HostListenerFactory(functions, singletonManager, _jobActivator, null, loggerFactory, monitorManager, targetScaleManager, () => { }, false, drainModeManagerMock.Object); IListener listener = await factory.CreateAsync(CancellationToken.None); var innerListeners = ((IEnumerable)listener).ToArray(); @@ -69,6 +71,39 @@ public async Task CreateAsync_RegistersScaleMonitors() Assert.Same(testListener, monitors[0]); } + [Fact] + public async Task CreateAsync_RegistersTargetScalers() + { + Mock mockFunctionDefinition = new Mock(); + Mock mockInstanceFactory = new Mock(MockBehavior.Strict); + Mock mockListenerFactory = new Mock(MockBehavior.Strict); + var testListener = new TestListener_TargetScaler(); + mockListenerFactory.Setup(p => p.CreateAsync(It.IsAny())).ReturnsAsync(testListener); + SingletonManager singletonManager = new SingletonManager(); + + ILoggerFactory loggerFactory = new LoggerFactory(); + TestLoggerProvider loggerProvider = new TestLoggerProvider(); + loggerFactory.AddProvider(loggerProvider); + + List functions = new List(); + var method = typeof(Functions1).GetMethod("TestJob", BindingFlags.Public | BindingFlags.Static); + FunctionDescriptor descriptor = FunctionIndexer.FromMethod(method, _configuration, _jobActivator); + FunctionDefinition definition = new FunctionDefinition(descriptor, mockInstanceFactory.Object, mockListenerFactory.Object); + functions.Add(definition); + + var monitorManager = new ScaleMonitorManager(); + var targetScaleManager = new TargetScalerManager(); + var drainModeManagerMock = new Mock(); + HostListenerFactory factory = new HostListenerFactory(functions, singletonManager, _jobActivator, null, loggerFactory, monitorManager, targetScaleManager, () => { }, false, drainModeManagerMock.Object); + IListener listener = await factory.CreateAsync(CancellationToken.None); + + var innerListeners = ((IEnumerable)listener).ToArray(); + + var targetScalers = targetScaleManager.GetTargetScalers().ToArray(); + Assert.Single(targetScalers); + Assert.Same(testListener, targetScalers[0]); + } + [Fact] public void RegisterScaleMonitor_Succeeds() { @@ -98,6 +133,35 @@ public void RegisterScaleMonitor_Succeeds() Assert.Same(testListenerMonitorProvider.GetMonitor(), monitors[1]); } + [Fact] + public void RegisterTargetScaler_Succeeds() + { + // listener is a direct target scaler + var manager = new TargetScalerManager(); + var testListener = new TestListener_TargetScaler(); + HostListenerFactory.RegisterTargetScaler(testListener, manager); + var targetScalers = manager.GetTargetScalers().ToArray(); + Assert.Single(targetScalers); + Assert.Same(testListener, targetScalers[0]); + + // listener is a target scaler provider + manager = new TargetScalerManager(); + var providerListener = new TestListener_TargetScalerProvider(); + HostListenerFactory.RegisterTargetScaler(providerListener, manager); + targetScalers = manager.GetTargetScalers().ToArray(); + Assert.Single(targetScalers); + Assert.Same(providerListener.GetTargetScaler(), targetScalers[0]); + + // listener is composite, so we expect recursion + manager = new TargetScalerManager(); + var compositListener = new CompositeListener(testListener, providerListener); + HostListenerFactory.RegisterTargetScaler(compositListener, manager); + targetScalers = manager.GetTargetScalers().ToArray(); + Assert.Equal(2, targetScalers.Length); + Assert.Same(testListener, targetScalers[0]); + Assert.Same(providerListener.GetTargetScaler(), targetScalers[1]); + } + [Theory] [InlineData(typeof(Functions1), "DisabledAtParameterLevel")] [InlineData(typeof(Functions1), "DisabledAtMethodLevel")] @@ -138,8 +202,9 @@ public async Task CreateAsync_SkipsDisabledFunctions(Type jobType, string method // Create the composite listener - this will fail if any of the // function definitions indicate that they are not disabled var monitorManagerMock = new Mock(MockBehavior.Strict); + var targetScalerManagerMock = new Mock(MockBehavior.Strict); var drainModeManagerMock = new Mock(); - HostListenerFactory factory = new HostListenerFactory(functions, singletonManager, _jobActivator, null, loggerFactory, monitorManagerMock.Object, () => { }, false, drainModeManagerMock.Object); + HostListenerFactory factory = new HostListenerFactory(functions, singletonManager, _jobActivator, null, loggerFactory, monitorManagerMock.Object, targetScalerManagerMock.Object, () => { }, false, drainModeManagerMock.Object); IListener listener = await factory.CreateAsync(CancellationToken.None); @@ -196,7 +261,7 @@ public void IsDisabledBySetting_BindsSettingName(string settingName, bool disabl { "Disable_TestJob", "False" }, }) .Build(); - + Mock mockNameResolver = new Mock(MockBehavior.Strict); mockNameResolver.Setup(p => p.Resolve("Test")).Returns("TestValue"); @@ -405,5 +470,95 @@ public ScaleStatus GetScaleStatus(ScaleStatusContext context) } } } + + public class TestListener_TargetScaler : IListener, ITargetScaler + { + public TargetScalerDescriptor TargetScalerDescriptor => throw new NotImplementedException(); + + public void Cancel() + { + throw new NotImplementedException(); + } + + public void Dispose() + { + throw new NotImplementedException(); + } + + public Task GetScaleResultAsync(TargetScalerContext context) + { + throw new NotImplementedException(); + } + + public Task StartAsync(CancellationToken cancellationToken) + { + throw new NotImplementedException(); + } + + public Task StopAsync(CancellationToken cancellationToken) + { + throw new NotImplementedException(); + } + } + + public class TestListener_TargetScalerProvider : IListener, ITargetScalerProvider + { + private readonly ITargetScaler _targetScaler; + + public TestListener_TargetScalerProvider() + { + _targetScaler = new TestListener_TargetScalerImpl(); + } + + public void Cancel() + { + throw new NotImplementedException(); + } + + public void Dispose() + { + throw new NotImplementedException(); + } + + public ITargetScaler GetTargetScaler() + { + return _targetScaler; + } + + public Task StartAsync(CancellationToken cancellationToken) + { + throw new NotImplementedException(); + } + + public Task StopAsync(CancellationToken cancellationToken) + { + throw new NotImplementedException(); + } + + public class TestListener_TargetScalerImpl : ITargetScaler + { + public TargetScalerDescriptor TargetScalerDescriptor => throw new NotImplementedException(); + + public Task GetMetricsAsync() + { + throw new NotImplementedException(); + } + + public Task GetScaleResultAsync(TargetScalerContext context) + { + throw new NotImplementedException(); + } + } + } + + public class TestListener_MonitorAndTargetScaler : TestListener_Monitor, ITargetScaler + { + public TargetScalerDescriptor TargetScalerDescriptor => throw new NotImplementedException(); + + public Task GetScaleResultAsync(TargetScalerContext context) + { + throw new NotImplementedException(); + } + } } } diff --git a/test/Microsoft.Azure.WebJobs.Host.UnitTests/PublicSurfaceTests.cs b/test/Microsoft.Azure.WebJobs.Host.UnitTests/PublicSurfaceTests.cs index 27021688d..7bf8c0362 100644 --- a/test/Microsoft.Azure.WebJobs.Host.UnitTests/PublicSurfaceTests.cs +++ b/test/Microsoft.Azure.WebJobs.Host.UnitTests/PublicSurfaceTests.cs @@ -297,7 +297,13 @@ public void WebJobs_Host_VerifyPublicSurfaceArea() "FunctionActivityStatus", "IFunctionActivityStatusProvider", "SupportsRetryAttribute", - "AppServicesHostingUtility" + "AppServicesHostingUtility", + "ITargetScaler", + "ITargetScalerManager", + "ITargetScalerProvider", + "TargetScalerDescriptor", + "TargetScalerResult", + "TargetScalerContext" }; TestHelpers.AssertPublicTypes(expected, assembly);