diff --git a/src/WebJobs.Extensions.DurableTask/AzureStorageDurabilityProvider.cs b/src/WebJobs.Extensions.DurableTask/AzureStorageDurabilityProvider.cs
index f395c23de..38151e3cc 100644
--- a/src/WebJobs.Extensions.DurableTask/AzureStorageDurabilityProvider.cs
+++ b/src/WebJobs.Extensions.DurableTask/AzureStorageDurabilityProvider.cs
@@ -10,6 +10,7 @@
using DurableTask.AzureStorage.Tracking;
using DurableTask.Core;
using Microsoft.Extensions.Logging;
+using Microsoft.WindowsAzure.Storage;
using Newtonsoft.Json;
using Newtonsoft.Json.Converters;
using Newtonsoft.Json.Linq;
@@ -200,6 +201,16 @@ internal static OrchestrationInstanceStatusQueryCondition ConvertWebjobsDurableC
}
#if !FUNCTIONS_V1
+
+ internal DurableTaskMetricsProvider GetMetricsProvider(
+ string functionName,
+ string hubName,
+ CloudStorageAccount storageAccount,
+ ILogger logger)
+ {
+ return new DurableTaskMetricsProvider(functionName, hubName, logger, performanceMonitor: null, storageAccount);
+ }
+
///
public override bool TryGetScaleMonitor(
string functionId,
@@ -208,12 +219,31 @@ public override bool TryGetScaleMonitor(
string connectionName,
out IScaleMonitor scaleMonitor)
{
+ CloudStorageAccount storageAccount = this.storageAccountProvider.GetStorageAccountDetails(connectionName).ToCloudStorageAccount();
+ DurableTaskMetricsProvider metricsProvider = this.GetMetricsProvider(functionName, hubName, storageAccount, this.logger);
scaleMonitor = new DurableTaskScaleMonitor(
functionId,
functionName,
hubName,
- this.storageAccountProvider.GetStorageAccountDetails(connectionName).ToCloudStorageAccount(),
- this.logger);
+ storageAccount,
+ this.logger,
+ metricsProvider);
+ return true;
+ }
+
+#endif
+#if FUNCTIONS_V3_OR_GREATER
+ public override bool TryGetTargetScaler(
+ string functionId,
+ string functionName,
+ string hubName,
+ string connectionName,
+ out ITargetScaler targetScaler)
+ {
+ // This is only called by the ScaleController, it doesn't run in the Functions Host process.
+ CloudStorageAccount storageAccount = this.storageAccountProvider.GetStorageAccountDetails(connectionName).ToCloudStorageAccount();
+ DurableTaskMetricsProvider metricsProvider = this.GetMetricsProvider(functionName, hubName, storageAccount, this.logger);
+ targetScaler = new DurableTaskTargetScaler(functionId, metricsProvider, this, this.logger);
return true;
}
#endif
diff --git a/src/WebJobs.Extensions.DurableTask/DurabilityProvider.cs b/src/WebJobs.Extensions.DurableTask/DurabilityProvider.cs
index 2bc6f22e2..529a7adab 100644
--- a/src/WebJobs.Extensions.DurableTask/DurabilityProvider.cs
+++ b/src/WebJobs.Extensions.DurableTask/DurabilityProvider.cs
@@ -565,5 +565,27 @@ public virtual bool TryGetScaleMonitor(
return false;
}
#endif
+
+#if FUNCTIONS_V3_OR_GREATER
+ ///
+ /// Tries to obtain a scaler for target based scaling.
+ ///
+ /// Function id.
+ /// Function name.
+ /// Task hub name.
+ /// The name of the storage-specific connection settings.
+ /// The target-based scaler.
+ /// True if target-based scaling is supported, false otherwise.
+ public virtual bool TryGetTargetScaler(
+ string functionId,
+ string functionName,
+ string hubName,
+ string connectionName,
+ out ITargetScaler targetScaler)
+ {
+ targetScaler = null;
+ return false;
+ }
+#endif
}
}
diff --git a/src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs b/src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs
index cc1e63976..dd351c1fe 100644
--- a/src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs
+++ b/src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs
@@ -150,13 +150,13 @@ public DurableTaskExtension(
this.nameResolver = nameResolver ?? throw new ArgumentNullException(nameof(nameResolver));
this.loggerFactory = loggerFactory ?? throw new ArgumentNullException(nameof(loggerFactory));
this.PlatformInformationService = platformInformationService ?? throw new ArgumentNullException(nameof(platformInformationService));
- this.ResolveAppSettingOptions();
+ DurableTaskOptions.ResolveAppSettingOptions(this.Options, this.nameResolver);
ILogger logger = loggerFactory.CreateLogger(LoggerCategoryName);
this.TraceHelper = new EndToEndTraceHelper(logger, this.Options.Tracing.TraceReplayEvents);
this.LifeCycleNotificationHelper = lifeCycleNotificationHelper ?? this.CreateLifeCycleNotificationHelper();
- this.durabilityProviderFactory = this.GetDurabilityProviderFactory(this.Options, logger, orchestrationServiceFactories);
+ this.durabilityProviderFactory = GetDurabilityProviderFactory(this.Options, logger, orchestrationServiceFactories);
this.defaultDurabilityProvider = this.durabilityProviderFactory.GetDurabilityProvider();
this.isOptionsConfigured = true;
@@ -249,6 +249,8 @@ public string HubName
internal DurableTaskOptions Options { get; }
+ internal DurabilityProvider DefaultDurabilityProvider => this.defaultDurabilityProvider;
+
internal HttpApiHandler HttpApiHandler { get; private set; }
internal ILifeCycleNotificationHelper LifeCycleNotificationHelper { get; private set; }
@@ -296,7 +298,7 @@ private MessagePayloadDataConverter CreateErrorDataConverter(IErrorSerializerSet
return new MessagePayloadDataConverter(errorSerializerSettingsFactory.CreateJsonSerializerSettings(), isDefault);
}
- private IDurabilityProviderFactory GetDurabilityProviderFactory(DurableTaskOptions options, ILogger logger, IEnumerable orchestrationServiceFactories)
+ internal static IDurabilityProviderFactory GetDurabilityProviderFactory(DurableTaskOptions options, ILogger logger, IEnumerable orchestrationServiceFactories)
{
bool storageTypeIsConfigured = options.StorageProvider.TryGetValue("type", out object storageType);
@@ -578,32 +580,13 @@ private void StopLocalGrpcServer()
}
#endif
- private void ResolveAppSettingOptions()
- {
- if (this.Options == null)
- {
- throw new InvalidOperationException($"{nameof(this.Options)} must be set before resolving app settings.");
- }
-
- if (this.nameResolver == null)
- {
- throw new InvalidOperationException($"{nameof(this.nameResolver)} must be set before resolving app settings.");
- }
-
- if (this.nameResolver.TryResolveWholeString(this.Options.HubName, out string taskHubName))
- {
- // use the resolved task hub name
- this.Options.HubName = taskHubName;
- }
- }
-
private void InitializeForFunctionsV1(ExtensionConfigContext context)
{
#if FUNCTIONS_V1
context.ApplyConfig(this.Options, "DurableTask");
this.nameResolver = context.Config.NameResolver;
this.loggerFactory = context.Config.LoggerFactory;
- this.ResolveAppSettingOptions();
+ DurableTaskOptions.ResolveAppSettingOptions(this.Options, this.nameResolver);
ILogger logger = this.loggerFactory.CreateLogger(LoggerCategoryName);
this.TraceHelper = new EndToEndTraceHelper(logger, this.Options.Tracing.TraceReplayEvents);
this.connectionInfoResolver = new WebJobsConnectionInfoProvider();
@@ -1573,59 +1556,6 @@ internal static void TagActivityWithOrchestrationStatus(OrchestrationRuntimeStat
activity.AddTag("DurableFunctionsRuntimeStatus", statusStr);
}
}
-
- internal IScaleMonitor GetScaleMonitor(string functionId, FunctionName functionName, string connectionName)
- {
- if (this.defaultDurabilityProvider.TryGetScaleMonitor(
- functionId,
- functionName.Name,
- this.Options.HubName,
- connectionName,
- out IScaleMonitor scaleMonitor))
- {
- return scaleMonitor;
- }
- else
- {
- // the durability provider does not support runtime scaling.
- // Create an empty scale monitor to avoid exceptions (unless runtime scaling is actually turned on).
- return new NoOpScaleMonitor($"{functionId}-DurableTaskTrigger-{this.Options.HubName}".ToLower());
- }
- }
-
- ///
- /// A placeholder scale monitor, can be used by durability providers that do not support runtime scaling.
- /// This is required to allow operation of those providers even if runtime scaling is turned off
- /// see discussion https://github.com/Azure/azure-functions-durable-extension/pull/1009/files#r341767018.
- ///
- private sealed class NoOpScaleMonitor : IScaleMonitor
- {
- ///
- /// Construct a placeholder scale monitor.
- ///
- /// A descriptive name.
- public NoOpScaleMonitor(string name)
- {
- this.Descriptor = new ScaleMonitorDescriptor(name);
- }
-
- ///
- /// A descriptive name.
- ///
- public ScaleMonitorDescriptor Descriptor { get; private set; }
-
- ///
- Task IScaleMonitor.GetMetricsAsync()
- {
- throw new InvalidOperationException("The current DurableTask backend configuration does not support runtime scaling");
- }
-
- ///
- ScaleStatus IScaleMonitor.GetScaleStatus(ScaleStatusContext context)
- {
- throw new InvalidOperationException("The current DurableTask backend configuration does not support runtime scaling");
- }
- }
#endif
}
}
diff --git a/src/WebJobs.Extensions.DurableTask/DurableTaskJobHostConfigurationExtensions.cs b/src/WebJobs.Extensions.DurableTask/DurableTaskJobHostConfigurationExtensions.cs
index ac98d8632..bd793c17a 100644
--- a/src/WebJobs.Extensions.DurableTask/DurableTaskJobHostConfigurationExtensions.cs
+++ b/src/WebJobs.Extensions.DurableTask/DurableTaskJobHostConfigurationExtensions.cs
@@ -2,16 +2,19 @@
// Licensed under the MIT License. See LICENSE in the project root for license information.
using System;
-using System.Net.Http;
-using System.Threading;
+using System.Collections.Generic;
+using System.Linq;
#if !FUNCTIONS_V1
using Microsoft.Azure.WebJobs.Extensions.DurableTask.Auth;
using Microsoft.Azure.WebJobs.Extensions.DurableTask.ContextImplementations;
using Microsoft.Azure.WebJobs.Extensions.DurableTask.Correlation;
using Microsoft.Azure.WebJobs.Extensions.DurableTask.Options;
+using Microsoft.Azure.WebJobs.Extensions.DurableTask.Scale;
+using Microsoft.Azure.WebJobs.Host.Scale;
using Microsoft.Extensions.Azure;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
+using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
#else
using Microsoft.Azure.WebJobs.Extensions.DurableTask.ContextImplementations;
@@ -109,6 +112,27 @@ public static IWebJobsBuilder AddDurableTask(this IWebJobsBuilder builder)
return builder;
}
+#if FUNCTIONS_V3_OR_GREATER
+ ///
+ /// Adds the and providers for the Durable Triggers.
+ ///
+ /// The to configure.
+ /// Returns the provided .
+ internal static IWebJobsBuilder AddDurableScaleForTrigger(this IWebJobsBuilder builder, TriggerMetadata triggerMetadata)
+ {
+ // this segment adheres to the followings pattern: https://github.com/Azure/azure-sdk-for-net/pull/38756
+ DurableTaskTriggersScaleProvider provider = null;
+ builder.Services.AddSingleton(serviceProvider =>
+ {
+ provider = new DurableTaskTriggersScaleProvider(serviceProvider.GetService>(), serviceProvider.GetService(), serviceProvider.GetService(), serviceProvider.GetService>(), triggerMetadata);
+ return provider;
+ });
+ builder.Services.AddSingleton(serviceProvider => serviceProvider.GetServices().Single(x => x == provider));
+ builder.Services.AddSingleton(serviceProvider => serviceProvider.GetServices().Single(x => x == provider));
+ return builder;
+ }
+#endif
+
///
/// Adds the Durable Task extension to the provided .
///
diff --git a/src/WebJobs.Extensions.DurableTask/Listener/DurableTaskListener.cs b/src/WebJobs.Extensions.DurableTask/Listener/DurableTaskListener.cs
index e764ae623..4fa87cc07 100644
--- a/src/WebJobs.Extensions.DurableTask/Listener/DurableTaskListener.cs
+++ b/src/WebJobs.Extensions.DurableTask/Listener/DurableTaskListener.cs
@@ -2,12 +2,9 @@
// Licensed under the MIT License. See LICENSE in the project root for license information.
using System;
-using System.Collections.Generic;
-using System.Linq;
using System.Threading;
using System.Threading.Tasks;
-using DurableTask.AzureStorage.Monitoring;
-using Microsoft.Azure.WebJobs.Host.Executors;
+using Microsoft.Azure.WebJobs.Extensions.DurableTask.Scale;
using Microsoft.Azure.WebJobs.Host.Listeners;
#if !FUNCTIONS_V1
using Microsoft.Azure.WebJobs.Host.Scale;
@@ -15,7 +12,9 @@
namespace Microsoft.Azure.WebJobs.Extensions.DurableTask
{
-#if !FUNCTIONS_V1
+#if FUNCTIONS_V3_OR_GREATER
+ internal sealed class DurableTaskListener : IListener, IScaleMonitorProvider, ITargetScalerProvider
+#elif FUNCTIONS_V2_OR_GREATER
internal sealed class DurableTaskListener : IListener, IScaleMonitorProvider
#else
internal sealed class DurableTaskListener : IListener
@@ -26,10 +25,15 @@ internal sealed class DurableTaskListener : IListener
private readonly FunctionName functionName;
private readonly FunctionType functionType;
private readonly string connectionName;
+
#if !FUNCTIONS_V1
private readonly Lazy scaleMonitor;
#endif
+#if FUNCTIONS_V3_OR_GREATER
+ private readonly Lazy targetScaler;
+#endif
+
public DurableTaskListener(
DurableTaskExtension config,
string functionId,
@@ -48,12 +52,25 @@ public DurableTaskListener(
this.functionName = functionName;
this.functionType = functionType;
this.connectionName = connectionName;
+
#if !FUNCTIONS_V1
this.scaleMonitor = new Lazy(() =>
- this.config.GetScaleMonitor(
+ ScaleUtils.GetScaleMonitor(
+ this.config.DefaultDurabilityProvider,
this.functionId,
this.functionName,
- this.connectionName));
+ this.connectionName,
+ this.config.Options.HubName));
+
+#endif
+#if FUNCTIONS_V3_OR_GREATER
+ this.targetScaler = new Lazy(() =>
+ ScaleUtils.GetTargetScaler(
+ this.config.DefaultDurabilityProvider,
+ this.functionId,
+ this.functionName,
+ this.connectionName,
+ this.config.Options.HubName));
#endif
}
@@ -98,5 +115,12 @@ public IScaleMonitor GetMonitor()
return this.scaleMonitor.Value;
}
#endif
+
+#if FUNCTIONS_V3_OR_GREATER
+ public ITargetScaler GetTargetScaler()
+ {
+ return this.targetScaler.Value;
+ }
+#endif
}
}
diff --git a/src/WebJobs.Extensions.DurableTask/Listener/DurableTaskMetricsProvider.cs b/src/WebJobs.Extensions.DurableTask/Listener/DurableTaskMetricsProvider.cs
new file mode 100644
index 000000000..e3565d169
--- /dev/null
+++ b/src/WebJobs.Extensions.DurableTask/Listener/DurableTaskMetricsProvider.cs
@@ -0,0 +1,79 @@
+// Copyright (c) .NET Foundation. All rights reserved.
+// Licensed under the MIT License. See LICENSE in the project root for license information.
+
+#if !FUNCTIONS_V1
+using System;
+using System.Threading.Tasks;
+using DurableTask.AzureStorage.Monitoring;
+using Microsoft.Extensions.Logging;
+using Microsoft.WindowsAzure.Storage;
+using Newtonsoft.Json;
+
+namespace Microsoft.Azure.WebJobs.Extensions.DurableTask
+{
+ internal class DurableTaskMetricsProvider
+ {
+ private readonly string functionName;
+ private readonly string hubName;
+ private readonly ILogger logger;
+ private readonly CloudStorageAccount storageAccount;
+
+ private DisconnectedPerformanceMonitor performanceMonitor;
+
+ public DurableTaskMetricsProvider(string functionName, string hubName, ILogger logger, DisconnectedPerformanceMonitor performanceMonitor, CloudStorageAccount storageAccount)
+ {
+ this.functionName = functionName;
+ this.hubName = hubName;
+ this.logger = logger;
+ this.performanceMonitor = performanceMonitor;
+ this.storageAccount = storageAccount;
+ }
+
+ public virtual async Task GetMetricsAsync()
+ {
+ DurableTaskTriggerMetrics metrics = new DurableTaskTriggerMetrics();
+
+ // Durable stores its own metrics, so we just collect them here
+ PerformanceHeartbeat heartbeat = null;
+ try
+ {
+ DisconnectedPerformanceMonitor performanceMonitor = this.GetPerformanceMonitor();
+ heartbeat = await performanceMonitor.PulseAsync();
+ }
+ catch (StorageException e)
+ {
+ this.logger.LogWarning("{details}. Function: {functionName}. HubName: {hubName}.", e.ToString(), this.functionName, this.hubName);
+ }
+
+ if (heartbeat != null)
+ {
+ metrics.PartitionCount = heartbeat.PartitionCount;
+ metrics.ControlQueueLengths = JsonConvert.SerializeObject(heartbeat.ControlQueueLengths);
+ metrics.ControlQueueLatencies = JsonConvert.SerializeObject(heartbeat.ControlQueueLatencies);
+ metrics.WorkItemQueueLength = heartbeat.WorkItemQueueLength;
+ if (heartbeat.WorkItemQueueLatency > TimeSpan.Zero)
+ {
+ metrics.WorkItemQueueLatency = heartbeat.WorkItemQueueLatency.ToString();
+ }
+ }
+
+ return metrics;
+ }
+
+ internal DisconnectedPerformanceMonitor GetPerformanceMonitor()
+ {
+ if (this.performanceMonitor == null)
+ {
+ if (this.storageAccount == null)
+ {
+ throw new ArgumentNullException(nameof(this.storageAccount));
+ }
+
+ this.performanceMonitor = new DisconnectedPerformanceMonitor(this.storageAccount, this.hubName);
+ }
+
+ return this.performanceMonitor;
+ }
+ }
+}
+#endif
\ No newline at end of file
diff --git a/src/WebJobs.Extensions.DurableTask/Listener/DurableTaskScaleMonitor.cs b/src/WebJobs.Extensions.DurableTask/Listener/DurableTaskScaleMonitor.cs
index 3bf18598d..4c05b3df0 100644
--- a/src/WebJobs.Extensions.DurableTask/Listener/DurableTaskScaleMonitor.cs
+++ b/src/WebJobs.Extensions.DurableTask/Listener/DurableTaskScaleMonitor.cs
@@ -22,6 +22,7 @@ internal sealed class DurableTaskScaleMonitor : IScaleMonitor IScaleMonitor.GetMetricsAsync()
@@ -72,33 +75,7 @@ async Task IScaleMonitor.GetMetricsAsync()
public async Task GetMetricsAsync()
{
- DurableTaskTriggerMetrics metrics = new DurableTaskTriggerMetrics();
-
- // Durable stores its own metrics, so we just collect them here
- PerformanceHeartbeat heartbeat = null;
- try
- {
- DisconnectedPerformanceMonitor performanceMonitor = this.GetPerformanceMonitor();
- heartbeat = await performanceMonitor.PulseAsync();
- }
- catch (StorageException e)
- {
- this.logger.LogWarning("{details}. Function: {functionName}. HubName: {hubName}.", e.ToString(), this.functionName, this.hubName);
- }
-
- if (heartbeat != null)
- {
- metrics.PartitionCount = heartbeat.PartitionCount;
- metrics.ControlQueueLengths = JsonConvert.SerializeObject(heartbeat.ControlQueueLengths);
- metrics.ControlQueueLatencies = JsonConvert.SerializeObject(heartbeat.ControlQueueLatencies);
- metrics.WorkItemQueueLength = heartbeat.WorkItemQueueLength;
- if (heartbeat.WorkItemQueueLatency > TimeSpan.Zero)
- {
- metrics.WorkItemQueueLatency = heartbeat.WorkItemQueueLatency.ToString();
- }
- }
-
- return metrics;
+ return await this.durableTaskMetricsProvider.GetMetricsAsync();
}
ScaleStatus IScaleMonitor.GetScaleStatus(ScaleStatusContext context)
@@ -151,7 +128,7 @@ private ScaleStatus GetScaleStatusCore(int workerCount, DurableTaskTriggerMetric
}
}
- DisconnectedPerformanceMonitor performanceMonitor = this.GetPerformanceMonitor();
+ DisconnectedPerformanceMonitor performanceMonitor = this.durableTaskMetricsProvider.GetPerformanceMonitor();
var scaleRecommendation = performanceMonitor.MakeScaleRecommendation(workerCount, heartbeats.ToArray());
bool writeToUserLogs = false;
diff --git a/src/WebJobs.Extensions.DurableTask/Listener/DurableTaskTargetScaler.cs b/src/WebJobs.Extensions.DurableTask/Listener/DurableTaskTargetScaler.cs
new file mode 100644
index 000000000..c1faa94f4
--- /dev/null
+++ b/src/WebJobs.Extensions.DurableTask/Listener/DurableTaskTargetScaler.cs
@@ -0,0 +1,94 @@
+// Copyright (c) .NET Foundation. All rights reserved.
+// Licensed under the MIT License. See LICENSE in the project root for license information.
+
+#if FUNCTIONS_V3_OR_GREATER
+#nullable enable
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading.Tasks;
+using Microsoft.Azure.WebJobs.Host.Scale;
+using Microsoft.Extensions.Logging;
+using Newtonsoft.Json;
+
+namespace Microsoft.Azure.WebJobs.Extensions.DurableTask
+{
+ internal class DurableTaskTargetScaler : ITargetScaler
+ {
+ private readonly DurableTaskMetricsProvider metricsProvider;
+ private readonly TargetScalerResult scaleResult;
+ private readonly DurabilityProvider durabilityProvider;
+ private readonly ILogger logger;
+ private readonly string functionId;
+
+ public DurableTaskTargetScaler(string functionId, DurableTaskMetricsProvider metricsProvider, DurabilityProvider durabilityProvider, ILogger logger)
+ {
+ this.functionId = functionId;
+ this.metricsProvider = metricsProvider;
+ this.scaleResult = new TargetScalerResult();
+ this.TargetScalerDescriptor = new TargetScalerDescriptor(this.functionId);
+ this.durabilityProvider = durabilityProvider;
+ this.logger = logger;
+ }
+
+ public TargetScalerDescriptor TargetScalerDescriptor { get; }
+
+ private int MaxConcurrentActivities => this.durabilityProvider.MaxConcurrentTaskActivityWorkItems;
+
+ private int MaxConcurrentOrchestrators => this.durabilityProvider.MaxConcurrentTaskOrchestrationWorkItems;
+
+ public async Task GetScaleResultAsync(TargetScalerContext context)
+ {
+ DurableTaskTriggerMetrics? metrics = null;
+ try
+ {
+ // This method is only invoked by the ScaleController, so it doesn't run in the Functions Host process.
+ metrics = await this.metricsProvider.GetMetricsAsync();
+
+ // compute activityWorkers: the number of workers we need to process all activity messages
+ var workItemQueueLength = metrics.WorkItemQueueLength;
+ double activityWorkers = Math.Ceiling(workItemQueueLength / (double)this.MaxConcurrentActivities);
+
+ var serializedControlQueueLengths = metrics.ControlQueueLengths;
+ var controlQueueLengths = JsonConvert.DeserializeObject>(serializedControlQueueLengths);
+
+ var controlQueueMessages = controlQueueLengths.Sum();
+ var activeControlQueues = controlQueueLengths.Count(x => x > 0);
+
+ // compute orchestratorWorkers: the number of workers we need to process all orchestrator messages.
+ // We bound this result to be no larger than the partition count
+ var upperBoundControlWorkers = Math.Ceiling(controlQueueMessages / (double)this.MaxConcurrentOrchestrators);
+ var orchestratorWorkers = Math.Min(activeControlQueues, upperBoundControlWorkers);
+
+ int numWorkersToRequest = (int)Math.Max(activityWorkers, orchestratorWorkers);
+ this.scaleResult.TargetWorkerCount = numWorkersToRequest;
+
+ // When running on ScaleController V3, ILogger logs are forwarded to the ScaleController's Kusto table.
+ // This works because this code does not execute in the Functions Host process, but in the ScaleController process,
+ // and the ScaleController is injecting it's own custom ILogger implementation that forwards logs to Kusto.
+ var metricsLog = $"Metrics: workItemQueueLength={workItemQueueLength}. controlQueueLengths={serializedControlQueueLengths}. " +
+ $"maxConcurrentOrchestrators={this.MaxConcurrentOrchestrators}. maxConcurrentActivities={this.MaxConcurrentActivities}";
+ var scaleControllerLog = $"Target worker count for '{this.functionId}' is '{numWorkersToRequest}'. " +
+ metricsLog;
+
+ // target worker count should never be negative
+ if (numWorkersToRequest < 0)
+ {
+ throw new InvalidOperationException("Number of workers to request cannot be negative");
+ }
+
+ this.logger.LogInformation(scaleControllerLog);
+ return this.scaleResult;
+ }
+ catch (Exception ex)
+ {
+ // We want to augment the exception with metrics information for investigation purposes
+ var metricsLog = $"Metrics: workItemQueueLength={metrics?.WorkItemQueueLength}. controlQueueLengths={metrics?.ControlQueueLengths}. " +
+ $"maxConcurrentOrchestrators={this.MaxConcurrentOrchestrators}. maxConcurrentActivities={this.MaxConcurrentActivities}";
+ var errorLog = $"Error: target worker count for '{this.functionId}' resulted in exception. " + metricsLog;
+ throw new Exception(errorLog, ex);
+ }
+ }
+ }
+}
+#endif
\ No newline at end of file
diff --git a/src/WebJobs.Extensions.DurableTask/Listener/DurableTaskTriggerMetrics.cs b/src/WebJobs.Extensions.DurableTask/Listener/DurableTaskTriggerMetrics.cs
index 43008bbd9..68fa8a10c 100644
--- a/src/WebJobs.Extensions.DurableTask/Listener/DurableTaskTriggerMetrics.cs
+++ b/src/WebJobs.Extensions.DurableTask/Listener/DurableTaskTriggerMetrics.cs
@@ -1,6 +1,7 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See LICENSE in the project root for license information.
#if !FUNCTIONS_V1
+using System.Collections.Generic;
using Microsoft.Azure.WebJobs.Host.Scale;
namespace Microsoft.Azure.WebJobs.Extensions.DurableTask
@@ -10,13 +11,13 @@ internal class DurableTaskTriggerMetrics : ScaleMetrics
///
/// The number of partitions in the task hub.
///
- public int PartitionCount { get; set; }
+ public virtual int PartitionCount { get; set; }
///
/// The number of messages across control queues. This will
/// be in the form of a serialized array of ints, e.g. "[1,2,3,4]".
///
- public string ControlQueueLengths { get; set; }
+ public virtual string ControlQueueLengths { get; set; }
///
/// The latency of messages across control queues. This will
@@ -28,7 +29,7 @@ internal class DurableTaskTriggerMetrics : ScaleMetrics
///
/// The number of messages in the work-item queue.
///
- public int WorkItemQueueLength { get; set; }
+ public virtual int WorkItemQueueLength { get; set; }
///
/// The approximate age of the first work-item queue message. This
diff --git a/src/WebJobs.Extensions.DurableTask/Microsoft.Azure.WebJobs.Extensions.DurableTask.xml b/src/WebJobs.Extensions.DurableTask/Microsoft.Azure.WebJobs.Extensions.DurableTask.xml
index 518e1305d..f76c0ad61 100644
--- a/src/WebJobs.Extensions.DurableTask/Microsoft.Azure.WebJobs.Extensions.DurableTask.xml
+++ b/src/WebJobs.Extensions.DurableTask/Microsoft.Azure.WebJobs.Extensions.DurableTask.xml
@@ -2634,30 +2634,6 @@
This metadata will show up in Application Insights, if enabled.
-
-
- A placeholder scale monitor, can be used by durability providers that do not support runtime scaling.
- This is required to allow operation of those providers even if runtime scaling is turned off
- see discussion https://github.com/Azure/azure-functions-durable-extension/pull/1009/files#r341767018.
-
-
-
-
- Construct a placeholder scale monitor.
-
- A descriptive name.
-
-
-
- A descriptive name.
-
-
-
-
-
-
-
-
Extension for registering a Durable Functions configuration with JobHostConfiguration.
@@ -4850,6 +4826,31 @@
The delegate to handle exception to determine if retries should proceed.
+
+
+ A placeholder scale monitor, can be used by durability providers that do not support runtime scaling.
+ This is required to allow operation of those providers even if runtime scaling is turned off
+ see discussion https://github.com/Azure/azure-functions-durable-extension/pull/1009/files#r341767018.
+
+
+
+
+ Construct a placeholder scale monitor.
+
+ A descriptive name.
+ The function ID.
+
+
+
+ A descriptive name.
+
+
+
+
+
+
+
+
Connection info provider which resolves connection information from a standard application (Non WebJob).
diff --git a/src/WebJobs.Extensions.DurableTask/Options/DurableTaskOptions.cs b/src/WebJobs.Extensions.DurableTask/Options/DurableTaskOptions.cs
index 47249848a..e5eac789f 100644
--- a/src/WebJobs.Extensions.DurableTask/Options/DurableTaskOptions.cs
+++ b/src/WebJobs.Extensions.DurableTask/Options/DurableTaskOptions.cs
@@ -5,6 +5,7 @@
using System.Collections.Generic;
using System.Net.Http;
using DurableTask.AzureStorage.Partitioning;
+using Microsoft.Azure.WebJobs.Host;
using Newtonsoft.Json;
using Newtonsoft.Json.Converters;
using Newtonsoft.Json.Linq;
@@ -229,6 +230,25 @@ public string HubName
// to mock the value from ExtensionConfigContext. It should not be used in production code paths.
internal Func WebhookUriProviderOverride { get; set; }
+ internal static void ResolveAppSettingOptions(DurableTaskOptions options, INameResolver nameResolver)
+ {
+ if (options == null)
+ {
+ throw new InvalidOperationException($"{nameof(options)} must be set before resolving app settings.");
+ }
+
+ if (nameResolver == null)
+ {
+ throw new InvalidOperationException($"{nameof(nameResolver)} must be set before resolving app settings.");
+ }
+
+ if (nameResolver.TryResolveWholeString(options.HubName, out string taskHubName))
+ {
+ // use the resolved task hub name
+ options.HubName = taskHubName;
+ }
+ }
+
///
/// Sets HubName to a value that is considered a default value.
///
diff --git a/src/WebJobs.Extensions.DurableTask/Scale/DurableTaskTriggersScaleProvider.cs b/src/WebJobs.Extensions.DurableTask/Scale/DurableTaskTriggersScaleProvider.cs
new file mode 100644
index 000000000..fd50f9e38
--- /dev/null
+++ b/src/WebJobs.Extensions.DurableTask/Scale/DurableTaskTriggersScaleProvider.cs
@@ -0,0 +1,129 @@
+// Copyright (c) .NET Foundation. All rights reserved.
+// Licensed under the MIT License. See LICENSE in the project root for license information.
+#nullable enable
+#if FUNCTIONS_V3_OR_GREATER
+
+using System;
+using System.Collections.Generic;
+using System.ComponentModel;
+using Microsoft.Azure.WebJobs.Host.Scale;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Options;
+using Newtonsoft.Json;
+
+namespace Microsoft.Azure.WebJobs.Extensions.DurableTask.Scale
+{
+ internal class DurableTaskTriggersScaleProvider : IScaleMonitorProvider, ITargetScalerProvider
+ {
+ private readonly IScaleMonitor monitor;
+ private readonly ITargetScaler targetScaler;
+ private readonly DurableTaskOptions options;
+ private readonly INameResolver nameResolver;
+ private readonly ILoggerFactory loggerFactory;
+ private readonly IEnumerable durabilityProviderFactories;
+
+ public DurableTaskTriggersScaleProvider(
+ IOptions durableTaskOptions,
+ INameResolver nameResolver,
+ ILoggerFactory loggerFactory,
+ IEnumerable durabilityProviderFactories,
+ TriggerMetadata triggerMetadata)
+ {
+ this.options = durableTaskOptions.Value;
+ this.nameResolver = nameResolver;
+ this.loggerFactory = loggerFactory;
+ this.durabilityProviderFactories = durabilityProviderFactories;
+
+ string functionId = triggerMetadata.FunctionName;
+ FunctionName functionName = new FunctionName(functionId);
+
+ this.GetOptions(triggerMetadata);
+
+ IDurabilityProviderFactory durabilityProviderFactory = this.GetDurabilityProviderFactory();
+ DurabilityProvider defaultDurabilityProvider = durabilityProviderFactory.GetDurabilityProvider();
+
+ string? connectionName = durabilityProviderFactory is AzureStorageDurabilityProviderFactory azureStorageDurabilityProviderFactory
+ ? azureStorageDurabilityProviderFactory.DefaultConnectionName
+ : null;
+
+ this.targetScaler = ScaleUtils.GetTargetScaler(
+ defaultDurabilityProvider,
+ functionId,
+ functionName,
+ connectionName,
+ this.options.HubName);
+
+ this.monitor = ScaleUtils.GetScaleMonitor(
+ defaultDurabilityProvider,
+ functionId,
+ functionName,
+ connectionName,
+ this.options.HubName);
+ }
+
+ private void GetOptions(TriggerMetadata triggerMetadata)
+ {
+ // the metadata is the sync triggers payload
+ var metadata = triggerMetadata.Metadata.ToObject();
+
+ // The property `taskHubName` is always expected in the SyncTriggers payload
+ this.options.HubName = metadata?.TaskHubName ?? throw new InvalidOperationException($"Expected `taskHubName` property in SyncTriggers payload but found none. Payload: {triggerMetadata.Metadata}");
+ if (metadata?.MaxConcurrentActivityFunctions != null)
+ {
+ this.options.MaxConcurrentActivityFunctions = metadata?.MaxConcurrentActivityFunctions;
+ }
+
+ if (metadata?.MaxConcurrentOrchestratorFunctions != null)
+ {
+ this.options.MaxConcurrentOrchestratorFunctions = metadata?.MaxConcurrentOrchestratorFunctions;
+ }
+
+ if (metadata?.StorageProvider != null)
+ {
+ this.options.StorageProvider = metadata?.StorageProvider;
+ }
+
+ DurableTaskOptions.ResolveAppSettingOptions(this.options, this.nameResolver);
+ }
+
+ private IDurabilityProviderFactory GetDurabilityProviderFactory()
+ {
+ var logger = this.loggerFactory.CreateLogger();
+ var durabilityProviderFactory = DurableTaskExtension.GetDurabilityProviderFactory(this.options, logger, this.durabilityProviderFactories);
+ return durabilityProviderFactory;
+ }
+
+ public IScaleMonitor GetMonitor()
+ {
+ return this.monitor;
+ }
+
+ public ITargetScaler GetTargetScaler()
+ {
+ return this.targetScaler;
+ }
+
+ ///
+ /// Captures the relevant DF SyncTriggers JSON properties for making scaling decisions.
+ ///
+ internal class DurableTaskMetadata
+ {
+ [JsonProperty]
+ public string? TaskHubName { get; set; }
+
+ [JsonProperty(DefaultValueHandling = DefaultValueHandling.Populate)]
+ [DefaultValue(null)]
+ public int? MaxConcurrentOrchestratorFunctions { get; set; }
+
+ [JsonProperty(DefaultValueHandling = DefaultValueHandling.Populate)]
+ [DefaultValue(null)]
+ public int? MaxConcurrentActivityFunctions { get; set; }
+
+ [JsonProperty(DefaultValueHandling = DefaultValueHandling.Populate)]
+ [DefaultValue(null)]
+ public IDictionary? StorageProvider { get; set; }
+ }
+ }
+}
+#endif
\ No newline at end of file
diff --git a/src/WebJobs.Extensions.DurableTask/Scale/ScaleUtils.cs b/src/WebJobs.Extensions.DurableTask/Scale/ScaleUtils.cs
new file mode 100644
index 000000000..0d60da23f
--- /dev/null
+++ b/src/WebJobs.Extensions.DurableTask/Scale/ScaleUtils.cs
@@ -0,0 +1,120 @@
+// Copyright (c) .NET Foundation. All rights reserved.
+// Licensed under the MIT License. See LICENSE in the project root for license information.
+#nullable enable
+
+using System;
+using System.Threading.Tasks;
+
+#if !FUNCTIONS_V1
+using Microsoft.Azure.WebJobs.Host.Scale;
+#endif
+
+namespace Microsoft.Azure.WebJobs.Extensions.DurableTask.Scale
+{
+ internal static class ScaleUtils
+ {
+#if !FUNCTIONS_V1
+ internal static IScaleMonitor GetScaleMonitor(DurabilityProvider durabilityProvider, string functionId, FunctionName functionName, string? connectionName, string hubName)
+ {
+ if (durabilityProvider.TryGetScaleMonitor(
+ functionId,
+ functionName.Name,
+ hubName,
+ connectionName,
+ out IScaleMonitor scaleMonitor))
+ {
+ return scaleMonitor;
+ }
+ else
+ {
+ // the durability provider does not support runtime scaling.
+ // Create an empty scale monitor to avoid exceptions (unless runtime scaling is actually turned on).
+ return new NoOpScaleMonitor($"{functionId}-DurableTaskTrigger-{hubName}".ToLower(), functionId);
+ }
+ }
+
+ ///
+ /// A placeholder scale monitor, can be used by durability providers that do not support runtime scaling.
+ /// This is required to allow operation of those providers even if runtime scaling is turned off
+ /// see discussion https://github.com/Azure/azure-functions-durable-extension/pull/1009/files#r341767018.
+ ///
+ internal sealed class NoOpScaleMonitor : IScaleMonitor
+ {
+ ///
+ /// Construct a placeholder scale monitor.
+ ///
+ /// A descriptive name.
+ /// The function ID.
+ public NoOpScaleMonitor(string name, string functionId)
+ {
+#if FUNCTIONS_V3_OR_GREATER
+ this.Descriptor = new ScaleMonitorDescriptor(name, functionId);
+#else
+#pragma warning disable CS0618 // Type or member is obsolete
+ this.Descriptor = new ScaleMonitorDescriptor(name);
+#pragma warning restore CS0618 // Type or member is obsolete
+#endif
+ }
+
+ ///
+ /// A descriptive name.
+ ///
+ public ScaleMonitorDescriptor Descriptor { get; private set; }
+
+ ///
+ Task IScaleMonitor.GetMetricsAsync()
+ {
+ throw new InvalidOperationException("The current DurableTask backend configuration does not support runtime scaling");
+ }
+
+ ///
+ ScaleStatus IScaleMonitor.GetScaleStatus(ScaleStatusContext context)
+ {
+ throw new InvalidOperationException("The current DurableTask backend configuration does not support runtime scaling");
+ }
+ }
+#endif
+
+#if FUNCTIONS_V3_OR_GREATER
+#pragma warning disable SA1201 // Elements should appear in the correct order
+ internal static ITargetScaler GetTargetScaler(DurabilityProvider durabilityProvider, string functionId, FunctionName functionName, string? connectionName, string hubName)
+#pragma warning restore SA1201 // Elements should appear in the correct order
+ {
+ if (durabilityProvider.TryGetTargetScaler(
+ functionId,
+ functionName.Name,
+ hubName,
+ connectionName,
+ out ITargetScaler targetScaler))
+ {
+ return targetScaler;
+ }
+ else
+ {
+ // the durability provider does not support target-based scaling.
+ // Create an empty target scaler to avoid exceptions (unless target-based scaling is actually turned on).
+ return new NoOpTargetScaler(functionId);
+ }
+ }
+
+ internal sealed class NoOpTargetScaler : ITargetScaler
+ {
+ ///
+ /// Construct a placeholder target scaler.
+ ///
+ /// The function ID.
+ public NoOpTargetScaler(string functionId)
+ {
+ this.TargetScalerDescriptor = new TargetScalerDescriptor(functionId);
+ }
+
+ public TargetScalerDescriptor TargetScalerDescriptor { get; }
+
+ public Task GetScaleResultAsync(TargetScalerContext context)
+ {
+ throw new NotSupportedException("The current DurableTask backend configuration does not support target-based scaling");
+ }
+ }
+#endif
+ }
+}
diff --git a/src/WebJobs.Extensions.DurableTask/WebJobs.Extensions.DurableTask.csproj b/src/WebJobs.Extensions.DurableTask/WebJobs.Extensions.DurableTask.csproj
index 8ee4cb106..b057bfdba 100644
--- a/src/WebJobs.Extensions.DurableTask/WebJobs.Extensions.DurableTask.csproj
+++ b/src/WebJobs.Extensions.DurableTask/WebJobs.Extensions.DurableTask.csproj
@@ -89,7 +89,7 @@
-
+
diff --git a/test/Common/TestHelpers.cs b/test/Common/TestHelpers.cs
index ec1fd043c..dafa10e5d 100644
--- a/test/Common/TestHelpers.cs
+++ b/test/Common/TestHelpers.cs
@@ -13,9 +13,8 @@
using Microsoft.ApplicationInsights.Channel;
#if !FUNCTIONS_V1
using Microsoft.Extensions.Hosting;
+using Microsoft.Azure.WebJobs.Host.Scale;
#endif
-using Microsoft.Azure.WebJobs.Extensions.DurableTask.ContextImplementations;
-using Microsoft.Azure.WebJobs.Extensions.DurableTask.Options;
using Microsoft.Azure.WebJobs.Host.TestCommon;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
@@ -67,6 +66,9 @@ public static ITestHost GetJobHost(
int entityMessageReorderWindowInMinutes = 30,
string exactTaskHubName = null,
bool addDurableClientFactory = false,
+#if !FUNCTIONS_V1
+ Action configureScaleOptions = null,
+#endif
Type[] types = null)
{
switch (storageProviderType)
@@ -160,6 +162,7 @@ public static ITestHost GetJobHost(
#if !FUNCTIONS_V1
addDurableClientFactory: addDurableClientFactory,
types: types,
+ configureScaleOptions: configureScaleOptions,
#endif
durabilityProviderFactoryType: durabilityProviderFactoryType);
}
@@ -175,6 +178,9 @@ public static ITestHost GetJobHostWithOptions(
Action onSend = null,
Type durabilityProviderFactoryType = null,
bool addDurableClientFactory = false,
+#if !FUNCTIONS_V1
+ Action configureScaleOptions = null,
+#endif
Type[] types = null)
{
if (serializerSettings == null)
@@ -198,6 +204,7 @@ public static ITestHost GetJobHostWithOptions(
durabilityProviderFactoryType: durabilityProviderFactoryType,
addDurableClientFactory: addDurableClientFactory,
typeLocator: typeLocator,
+ configureScaleOptions: configureScaleOptions,
#endif
loggerProvider: loggerProvider,
nameResolver: testNameResolver,
diff --git a/test/FunctionsV2/DurableTaskScaleMonitorTests.cs b/test/FunctionsV2/DurableTaskScaleMonitorTests.cs
index 930ff4a1c..57566da29 100644
--- a/test/FunctionsV2/DurableTaskScaleMonitorTests.cs
+++ b/test/FunctionsV2/DurableTaskScaleMonitorTests.cs
@@ -40,6 +40,12 @@ public DurableTaskScaleMonitorTests(ITestOutputHelper output)
ILogger logger = this.loggerFactory.CreateLogger(LogCategories.CreateTriggerCategory("DurableTask"));
this.traceHelper = new EndToEndTraceHelper(logger, false);
this.performanceMonitor = new Mock(MockBehavior.Strict, this.storageAccount, this.hubName, (int?)null);
+ var metricsProvider = new DurableTaskMetricsProvider(
+ this.functionName.Name,
+ this.hubName,
+ logger,
+ this.performanceMonitor.Object,
+ this.storageAccount);
this.scaleMonitor = new DurableTaskScaleMonitor(
this.functionId,
@@ -47,6 +53,7 @@ public DurableTaskScaleMonitorTests(ITestOutputHelper output)
this.hubName,
this.storageAccount,
logger,
+ metricsProvider,
this.performanceMonitor.Object);
}
diff --git a/test/FunctionsV2/DurableTaskTargetScalerTests.cs b/test/FunctionsV2/DurableTaskTargetScalerTests.cs
new file mode 100644
index 000000000..cc2272e16
--- /dev/null
+++ b/test/FunctionsV2/DurableTaskTargetScalerTests.cs
@@ -0,0 +1,179 @@
+// Copyright (c) .NET Foundation. All rights reserved.
+// Licensed under the MIT License. See LICENSE in the project root for license information.
+
+using System;
+using System.Linq;
+using System.Threading.Tasks;
+using DurableTask.AzureStorage.Monitoring;
+using DurableTask.Core;
+using Microsoft.Azure.WebJobs.Extensions.DurableTask;
+using Microsoft.Azure.WebJobs.Extensions.DurableTask.Scale;
+using Microsoft.Azure.WebJobs.Extensions.DurableTask.Tests;
+using Microsoft.Azure.WebJobs.Host.Scale;
+using Microsoft.Azure.WebJobs.Host.TestCommon;
+using Microsoft.Azure.WebJobs.Logging;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Logging;
+using Microsoft.WindowsAzure.Storage;
+using Moq;
+using Xunit;
+using Xunit.Abstractions;
+using static Microsoft.Azure.WebJobs.Extensions.DurableTask.Scale.ScaleUtils;
+using static Microsoft.Azure.WebJobs.Extensions.DurableTask.Tests.PlatformSpecificHelpers;
+
+namespace WebJobs.Extensions.DurableTask.Tests.V2
+{
+ public class DurableTaskTargetScalerTests
+ {
+ private readonly DurableTaskTargetScaler targetScaler;
+ private readonly TargetScalerContext scalerContext;
+ private readonly Mock metricsProviderMock;
+ private readonly Mock triggerMetricsMock;
+ private readonly Mock orchestrationServiceMock;
+ private readonly Mock durabilityProviderMock;
+ private readonly TestLoggerProvider loggerProvider;
+ private readonly ITestOutputHelper output;
+
+ public DurableTaskTargetScalerTests(ITestOutputHelper output)
+ {
+ this.scalerContext = new TargetScalerContext();
+ this.output = output;
+ var loggerFactory = new LoggerFactory();
+ this.loggerProvider = new TestLoggerProvider(this.output);
+ loggerFactory.AddProvider(this.loggerProvider);
+ ILogger logger = loggerFactory.CreateLogger(LogCategories.CreateTriggerCategory("DurableTask"));
+
+ DisconnectedPerformanceMonitor nullPerformanceMonitorMock = null;
+ CloudStorageAccount nullCloudStorageAccountMock = null;
+ this.metricsProviderMock = new Mock(
+ MockBehavior.Strict,
+ "FunctionName",
+ "HubName",
+ logger,
+ nullPerformanceMonitorMock,
+ nullCloudStorageAccountMock);
+
+ this.triggerMetricsMock = new Mock(MockBehavior.Strict);
+ this.orchestrationServiceMock = new Mock(MockBehavior.Strict);
+
+ this.durabilityProviderMock = new Mock(
+ MockBehavior.Strict,
+ "storageProviderName",
+ this.orchestrationServiceMock.Object,
+ new Mock().Object,
+ "connectionName");
+
+ this.targetScaler = new DurableTaskTargetScaler(
+ "FunctionId",
+ this.metricsProviderMock.Object,
+ this.durabilityProviderMock.Object,
+ logger);
+ }
+
+ [Theory]
+ [Trait("Category", PlatformSpecificHelpers.TestCategory)]
+ [InlineData(1, 10, 10, "[1, 1, 1, 1]", 10)]
+ [InlineData(1, 10, 0, "[0, 0, 0, 0]", 0)]
+ [InlineData(1, 10, 0, "[2, 2, 3, 3]", 1)]
+ [InlineData(1, 10, 0, "[9999, 0, 0, 0]", 1)]
+ [InlineData(1, 10, 0, "[9999, 0, 0, 1]", 2)]
+ [InlineData(10, 10, 10, "[2, 2, 3, 3 ]", 1)]
+ [InlineData(10, 10, 30, "[10, 10, 10, 1]", 4)]
+ public async Task TestTargetScaler(int maxConcurrentActivities, int maxConcurrentOrchestrators, int workItemQueueLength, string controlQueueLengths, int expectedWorkerCount)
+ {
+ this.orchestrationServiceMock.SetupGet(m => m.MaxConcurrentTaskActivityWorkItems).Returns(maxConcurrentActivities);
+ this.orchestrationServiceMock.SetupGet(m => m.MaxConcurrentTaskOrchestrationWorkItems).Returns(maxConcurrentOrchestrators);
+
+ this.triggerMetricsMock.SetupGet(m => m.WorkItemQueueLength).Returns(workItemQueueLength);
+ this.triggerMetricsMock.SetupGet(m => m.ControlQueueLengths).Returns(controlQueueLengths);
+
+ this.metricsProviderMock.Setup(m => m.GetMetricsAsync()).ReturnsAsync(this.triggerMetricsMock.Object);
+
+ var scaleResult = await this.targetScaler.GetScaleResultAsync(this.scalerContext);
+ var targetWorkerCount = scaleResult.TargetWorkerCount;
+ Assert.Equal(expectedWorkerCount, targetWorkerCount);
+ }
+
+ [Theory]
+ [Trait("Category", PlatformSpecificHelpers.TestCategory)]
+ [InlineData(true)]
+ [InlineData(false)]
+ public void TestGetTargetScaler(bool supportsTBS)
+ {
+ ITargetScaler targetScaler = new Mock().Object;
+ this.durabilityProviderMock.Setup(m => m.TryGetTargetScaler(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), out targetScaler))
+ .Returns(supportsTBS);
+
+ var scaler = ScaleUtils.GetTargetScaler(this.durabilityProviderMock.Object, "FunctionId", new FunctionName("FunctionName"), "connectionName", "HubName");
+ if (!supportsTBS)
+ {
+ Assert.IsType(scaler);
+ Assert.ThrowsAsync(() => scaler.GetScaleResultAsync(context: null));
+ }
+ else
+ {
+ Assert.Equal(targetScaler, scaler);
+ }
+ }
+
+ [Theory]
+ [Trait("Category", PlatformSpecificHelpers.TestCategory)]
+ [InlineData(true)]
+ [InlineData(false)]
+ public void TestGetScaleMonitor(bool supportsScaleMonitor)
+ {
+ IScaleMonitor scaleMonitor = new Mock().Object;
+ this.durabilityProviderMock.Setup(m => m.TryGetScaleMonitor(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), out scaleMonitor))
+ .Returns(supportsScaleMonitor);
+
+ var monitor = ScaleUtils.GetScaleMonitor(this.durabilityProviderMock.Object, "FunctionId", new FunctionName("FunctionName"), "connectionName", "HubName");
+ if (!supportsScaleMonitor)
+ {
+ Assert.IsType(monitor);
+ Assert.Throws(() => monitor.GetScaleStatus(context: null));
+ }
+ else
+ {
+ Assert.Equal(scaleMonitor, monitor);
+ }
+ }
+
+ [Theory]
+ [Trait("Category", PlatformSpecificHelpers.TestCategory)]
+ [InlineData(true)]
+ [InlineData(false)]
+ public async void ScaleHostE2ETest(bool isTbsEnabled)
+ {
+ Action configureScaleOptions = (scaleOptions) =>
+ {
+ scaleOptions.IsTargetScalingEnabled = isTbsEnabled;
+ scaleOptions.MetricsPurgeEnabled = false;
+ scaleOptions.ScaleMetricsMaxAge = TimeSpan.FromMinutes(4);
+ scaleOptions.IsRuntimeScalingEnabled = true;
+ scaleOptions.ScaleMetricsSampleInterval = TimeSpan.FromSeconds(1);
+ };
+ using (FunctionsV2HostWrapper host = (FunctionsV2HostWrapper)TestHelpers.GetJobHost(this.loggerProvider, nameof(this.ScaleHostE2ETest), enableExtendedSessions: false, configureScaleOptions: configureScaleOptions))
+ {
+ await host.StartAsync();
+
+ IScaleStatusProvider scaleManager = host.InnerHost.Services.GetService();
+ var client = await host.StartOrchestratorAsync(nameof(TestOrchestrations.FanOutFanIn), 50, this.output);
+ var status = await client.WaitForCompletionAsync(this.output, timeout: TimeSpan.FromSeconds(400));
+ var scaleStatus = await scaleManager.GetScaleStatusAsync(new ScaleStatusContext());
+ await host.StopAsync();
+ Assert.Equal(OrchestrationRuntimeStatus.Completed, status?.RuntimeStatus);
+
+ // We inspect the Host's logs for evidence that the Host is correctly sampling our scaling requests.
+ // the expected logs depend on whether TBS is enabled or not
+ var expectedSubString = "scale monitors to sample";
+ if (isTbsEnabled)
+ {
+ expectedSubString = "target scalers to sample";
+ }
+
+ bool containsExpectedLog = this.loggerProvider.GetAllLogMessages().Select(p => p.FormattedMessage ?? "").Any(p => p.Contains(expectedSubString));
+ Assert.True(containsExpectedLog);
+ }
+ }
+ }
+}
diff --git a/test/FunctionsV2/PlatformSpecificHelpers.FunctionsV2.cs b/test/FunctionsV2/PlatformSpecificHelpers.FunctionsV2.cs
index bfd72124f..4b2d46456 100644
--- a/test/FunctionsV2/PlatformSpecificHelpers.FunctionsV2.cs
+++ b/test/FunctionsV2/PlatformSpecificHelpers.FunctionsV2.cs
@@ -8,7 +8,7 @@
using System.Threading.Tasks;
using Microsoft.ApplicationInsights.Channel;
using Microsoft.Azure.WebJobs.Extensions.DurableTask.Correlation;
-using Microsoft.Azure.WebJobs.Extensions.DurableTask.Options;
+using Microsoft.Azure.WebJobs.Host.Scale;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Hosting;
@@ -37,7 +37,8 @@ public static ITestHost CreateJobHost(
IMessageSerializerSettingsFactory serializerSettingsFactory,
Action onSend,
bool addDurableClientFactory,
- ITypeLocator typeLocator)
+ ITypeLocator typeLocator,
+ Action configureScaleOptions = null)
{
// Unless otherwise specified, use legacy partition management for tests as it makes the task hubs start up faster.
// These tests run on a single task hub workers, so they don't test partition management anyways, and that is tested
@@ -47,7 +48,7 @@ public static ITestHost CreateJobHost(
options.Value.StorageProvider.Add(nameof(AzureStorageOptions.UseLegacyPartitionManagement), true);
}
- IHost host = new HostBuilder()
+ var hostBuilder = new HostBuilder()
.ConfigureLogging(
loggingBuilder =>
{
@@ -97,9 +98,22 @@ public static ITestHost CreateJobHost(
return telemetryActivator;
});
}
- })
- .Build();
+ });
+
+ // if a configureScaleOptions action is provided, then we're probably trying to test the host's scaling logic
+ // we configure WebJobsScale and set the minimum logging level to `Debug`, as scaling logs are usually at the `Debug` level
+ if (configureScaleOptions != null)
+ {
+ hostBuilder.ConfigureWebJobsScale(
+ (context, builder) =>
+ {
+ // ignore
+ },
+ configureScaleOptions)
+ .ConfigureLogging(builder => builder.SetMinimumLevel(LogLevel.Debug));
+ }
+ var host = hostBuilder.Build();
return new FunctionsV2HostWrapper(host, options, nameResolver);
}
@@ -216,7 +230,7 @@ private static IWebJobsBuilder AddEmulatorDurableTask(this IWebJobsBuilder build
internal class FunctionsV2HostWrapper : ITestHost
{
- private readonly IHost innerHost;
+ internal readonly IHost InnerHost;
private readonly JobHost innerWebJobsHost;
private readonly DurableTaskOptions options;
private readonly INameResolver nameResolver;
@@ -226,8 +240,8 @@ public FunctionsV2HostWrapper(
IOptions options,
INameResolver nameResolver)
{
- this.innerHost = innerHost ?? throw new ArgumentNullException(nameof(innerHost));
- this.innerWebJobsHost = (JobHost)this.innerHost.Services.GetService();
+ this.InnerHost = innerHost ?? throw new ArgumentNullException(nameof(innerHost));
+ this.innerWebJobsHost = (JobHost)this.InnerHost.Services.GetService();
this.options = options.Value;
this.nameResolver = nameResolver;
}
@@ -236,8 +250,8 @@ internal FunctionsV2HostWrapper(
IHost innerHost,
IOptions options)
{
- this.innerHost = innerHost;
- this.innerWebJobsHost = (JobHost)this.innerHost.Services.GetService();
+ this.InnerHost = innerHost;
+ this.innerWebJobsHost = (JobHost)this.InnerHost.Services.GetService();
this.options = options.Value;
}
@@ -249,16 +263,16 @@ public Task CallAsync(MethodInfo method, IDictionary args)
public void Dispose()
{
- this.innerHost.Dispose();
+ this.InnerHost.Dispose();
}
- public Task StartAsync() => this.innerHost.StartAsync();
+ public Task StartAsync() => this.InnerHost.StartAsync();
public async Task StopAsync()
{
try
{
- await this.innerHost.StopAsync();
+ await this.InnerHost.StopAsync();
}
catch (OperationCanceledException)
{
diff --git a/test/FunctionsV2/WebJobs.Extensions.DurableTask.Tests.V2.csproj b/test/FunctionsV2/WebJobs.Extensions.DurableTask.Tests.V2.csproj
index 4840ca88a..ec2d5dd68 100644
--- a/test/FunctionsV2/WebJobs.Extensions.DurableTask.Tests.V2.csproj
+++ b/test/FunctionsV2/WebJobs.Extensions.DurableTask.Tests.V2.csproj
@@ -16,7 +16,7 @@
-
+