From dca1f9ba1b23e08e8dda900a2c9fcb1729c54451 Mon Sep 17 00:00:00 2001 From: Varshitha Bachu Date: Wed, 3 May 2023 12:13:31 -0700 Subject: [PATCH 01/13] initial commit --- .../NetheriteProvider.cs | 17 ++++ .../NetheriteTargetScaler.cs | 41 ++++++++ .../NetheriteOrchestrationService.cs | 20 +++- .../Scaling/NetheriteMetricsProvider.cs | 95 +++++++++++++++++++ .../Scaling/ScalingMonitor.cs | 72 ++------------ 5 files changed, 175 insertions(+), 70 deletions(-) create mode 100644 src/DurableTask.Netherite.AzureFunctions/NetheriteTargetScaler.cs create mode 100644 src/DurableTask.Netherite/Scaling/NetheriteMetricsProvider.cs diff --git a/src/DurableTask.Netherite.AzureFunctions/NetheriteProvider.cs b/src/DurableTask.Netherite.AzureFunctions/NetheriteProvider.cs index 06bb05b..ae04a3d 100644 --- a/src/DurableTask.Netherite.AzureFunctions/NetheriteProvider.cs +++ b/src/DurableTask.Netherite.AzureFunctions/NetheriteProvider.cs @@ -130,6 +130,23 @@ public override bool TryGetScaleMonitor( } } +#if !NETSTANDARD + public override bool TryGetTargetScaler( + string functionId, + string functionName, + string hubName, + string connectionName, + out ITargetScaler targetScaler) + { + ILoadPublisherService loadPublisher = this.Service.GetLoadPublisher(); + NetheriteMetricsProvider metricsProvider = this.Service.GetNetheriteMetricsProvider(loadPublisher, this.Settings.EventHubsConnection); + + targetScaler = new NetheriteTargetScaler(functionId, metricsProvider, this); + + return true; + } +#endif + public class NetheriteScaleMetrics : ScaleMetrics { public byte[] Metrics { get; set; } diff --git a/src/DurableTask.Netherite.AzureFunctions/NetheriteTargetScaler.cs b/src/DurableTask.Netherite.AzureFunctions/NetheriteTargetScaler.cs new file mode 100644 index 0000000..629043c --- /dev/null +++ b/src/DurableTask.Netherite.AzureFunctions/NetheriteTargetScaler.cs @@ -0,0 +1,41 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +#if !NETSTANDARD +#if !NETCOREAPP2_2 +namespace DurableTask.Netherite.AzureFunctions +{ + using System; + using System.Threading.Tasks; + using DurableTask.Netherite.Scaling; + using Microsoft.Azure.WebJobs.Extensions.DurableTask; + using Microsoft.Azure.WebJobs.Host.Scale; + + class NetheriteTargetScaler : ITargetScaler + { + readonly NetheriteMetricsProvider metricsProvider; + readonly DurabilityProvider durabilityProvider; + readonly TargetScalerResult scaleResult; + + public NetheriteTargetScaler( + string functionId, + NetheriteMetricsProvider metricsProvider, + DurabilityProvider durabilityProvider) + { + this.metricsProvider = metricsProvider; + this.durabilityProvider = durabilityProvider; + this.scaleResult = new TargetScalerResult(); + this.TargetScalerDescriptor = new TargetScalerDescriptor(functionId); + } + + public TargetScalerDescriptor TargetScalerDescriptor { get; private set; } + + public Task GetScaleResultAsync(TargetScalerContext context) + { + // Refer to GetScaleRecommendation() in ScaleMonitor + throw new NotImplementedException(); + } + } +} +#endif +#endif \ No newline at end of file diff --git a/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationService.cs b/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationService.cs index fed5bb6..84e8e68 100644 --- a/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationService.cs +++ b/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationService.cs @@ -212,9 +212,9 @@ public bool TryGetScalingMonitor(out ScalingMonitor monitor) { try { - ILoadPublisherService loadPublisher = string.IsNullOrEmpty(this.Settings.LoadInformationAzureTableName) ? - new AzureBlobLoadPublisher(this.Settings.BlobStorageConnection, this.Settings.HubName) - : new AzureTableLoadPublisher(this.Settings.TableStorageConnection, this.Settings.LoadInformationAzureTableName, this.Settings.HubName); + ILoadPublisherService loadPublisher = this.GetLoadPublisher(); + + NetheriteMetricsProvider netheriteMetricsProvider = this.GetNetheriteMetricsProvider(loadPublisher, this.Settings.EventHubsConnection); monitor = new ScalingMonitor( loadPublisher, @@ -223,7 +223,8 @@ public bool TryGetScalingMonitor(out ScalingMonitor monitor) this.Settings.HubName, this.TraceHelper.TraceScaleRecommendation, this.TraceHelper.TraceProgress, - this.TraceHelper.TraceError); + this.TraceHelper.TraceError, + netheriteMetricsProvider); return true; } @@ -237,6 +238,17 @@ public bool TryGetScalingMonitor(out ScalingMonitor monitor) return false; } + internal ILoadPublisherService GetLoadPublisher() + { + return string.IsNullOrEmpty(this.Settings.LoadInformationAzureTableName) ? + new AzureBlobLoadPublisher(this.Settings.BlobStorageConnection, this.Settings.HubName) + : new AzureTableLoadPublisher(this.Settings.TableStorageConnection, this.Settings.LoadInformationAzureTableName, this.Settings.HubName); + } + + internal NetheriteMetricsProvider GetNetheriteMetricsProvider(ILoadPublisherService loadPublisher, ConnectionInfo eventHubsConnection) + { + return new NetheriteMetricsProvider(loadPublisher, eventHubsConnection); + } public void WatchThreads(object _) { diff --git a/src/DurableTask.Netherite/Scaling/NetheriteMetricsProvider.cs b/src/DurableTask.Netherite/Scaling/NetheriteMetricsProvider.cs new file mode 100644 index 0000000..b3d793a --- /dev/null +++ b/src/DurableTask.Netherite/Scaling/NetheriteMetricsProvider.cs @@ -0,0 +1,95 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace DurableTask.Netherite.Scaling +{ + using System; + using System.Collections.Generic; + using System.Threading; + using System.Threading.Tasks; + using DurableTask.Netherite.EventHubsTransport; + using static DurableTask.Netherite.Scaling.ScalingMonitor; + + public class NetheriteMetricsProvider + { + readonly ILoadPublisherService loadPublisher; + readonly ConnectionInfo eventHubsConnection; + + public NetheriteMetricsProvider( + ILoadPublisherService loadPublisher, + ConnectionInfo eventHubsConnection) + { + this.loadPublisher = loadPublisher; + this.eventHubsConnection = eventHubsConnection; + } + + public async Task GetMetricsAsync() + { + DateTime now = DateTime.UtcNow; + var loadInformation = await this.loadPublisher.QueryAsync(CancellationToken.None).ConfigureAwait(false); + var busy = await this.TaskHubIsIdleAsync(loadInformation).ConfigureAwait(false); + + return new Metrics() + { + LoadInformation = loadInformation, + Busy = busy, + Timestamp = now, + }; + } + + /// + /// Determines if a taskhub is busy, based on load information for the partitions and on the eventhubs queue positions + /// + /// + /// null if the hub is idle, or a string describing the current non-idle state + public async Task TaskHubIsIdleAsync(Dictionary loadInformation) + { + // first, check if any of the partitions have queued work or are scheduled to wake up + foreach (var kvp in loadInformation) + { + string busy = kvp.Value.IsBusy(); + if (!string.IsNullOrEmpty(busy)) + { + return $"P{kvp.Key:D2} {busy}"; + } + } + + // next, check if any of the entries are not current, in the sense that their input queue position + // does not match the latest queue position + + + List positions = await Netherite.EventHubsTransport.EventHubsConnections.GetQueuePositionsAsync(this.eventHubsConnection, EventHubsTransport.PartitionHub).ConfigureAwait(false); + + if (positions == null) + { + return "eventhubs is missing"; + } + + for (int i = 0; i < positions.Count; i++) + { + if (!loadInformation.TryGetValue((uint)i, out var loadInfo)) + { + return $"P{i:D2} has no load information published yet"; + } + if (positions[i] > loadInfo.InputQueuePosition) + { + return $"P{i:D2} has input queue position {loadInfo.InputQueuePosition} which is {positions[(int)i] - loadInfo.InputQueuePosition} behind latest position {positions[i]}"; + } + } + + // finally, check if we have waited long enough + foreach (var kvp in loadInformation) + { + string latencyTrend = kvp.Value.LatencyTrend; + + if (!PartitionLoadInfo.IsLongIdle(latencyTrend)) + { + return $"P{kvp.Key:D2} had some activity recently, latency trend is {latencyTrend}"; + } + } + + // we have concluded that there are no pending work items, timers, or unprocessed input queue entries + return null; + } + } +} diff --git a/src/DurableTask.Netherite/Scaling/ScalingMonitor.cs b/src/DurableTask.Netherite/Scaling/ScalingMonitor.cs index 37f14cc..4f31a41 100644 --- a/src/DurableTask.Netherite/Scaling/ScalingMonitor.cs +++ b/src/DurableTask.Netherite/Scaling/ScalingMonitor.cs @@ -22,6 +22,7 @@ public class ScalingMonitor readonly string partitionLoadTableName; readonly string taskHubName; readonly ILoadPublisherService loadPublisher; + readonly NetheriteMetricsProvider netheriteMetricsProvider; // public logging actions to enable collection of scale-monitor-related logging within the Netherite infrastructure public Action RecommendationTracer { get; } @@ -47,7 +48,8 @@ public ScalingMonitor( string taskHubName, Action recommendationTracer, Action informationTracer, - Action errorTracer) + Action errorTracer, + NetheriteMetricsProvider netheriteMetricsProvider) { this.RecommendationTracer = recommendationTracer; this.InformationTracer = informationTracer; @@ -57,6 +59,8 @@ public ScalingMonitor( this.eventHubsConnection = eventHubsConnection; this.partitionLoadTableName = partitionLoadTableName; this.taskHubName = taskHubName; + + this.netheriteMetricsProvider = netheriteMetricsProvider; } /// @@ -96,16 +100,7 @@ public struct Metrics /// The collected metrics. public async Task CollectMetrics() { - DateTime now = DateTime.UtcNow; - var loadInformation = await this.loadPublisher.QueryAsync(CancellationToken.None).ConfigureAwait(false); - var busy = await this.TaskHubIsIdleAsync(loadInformation).ConfigureAwait(false); - - return new Metrics() - { - LoadInformation = loadInformation, - Busy = busy, - Timestamp = now, - }; + return await this.netheriteMetricsProvider.GetMetricsAsync(); } /// @@ -195,60 +190,5 @@ bool isSlowPartition(PartitionLoadInfo info) return new ScaleRecommendation(ScaleAction.None, keepWorkersAlive: true, reason: $"Partition latencies are healthy"); } } - - /// - /// Determines if a taskhub is busy, based on load information for the partitions and on the eventhubs queue positions - /// - /// - /// null if the hub is idle, or a string describing the current non-idle state - public async Task TaskHubIsIdleAsync(Dictionary loadInformation) - { - // first, check if any of the partitions have queued work or are scheduled to wake up - foreach (var kvp in loadInformation) - { - string busy = kvp.Value.IsBusy(); - if (!string.IsNullOrEmpty(busy)) - { - return $"P{kvp.Key:D2} {busy}"; - } - } - - // next, check if any of the entries are not current, in the sense that their input queue position - // does not match the latest queue position - - - List positions = await Netherite.EventHubsTransport.EventHubsConnections.GetQueuePositionsAsync(this.eventHubsConnection, EventHubsTransport.PartitionHub).ConfigureAwait(false); - - if (positions == null) - { - return "eventhubs is missing"; - } - - for (int i = 0; i < positions.Count; i++) - { - if (!loadInformation.TryGetValue((uint) i, out var loadInfo)) - { - return $"P{i:D2} has no load information published yet"; - } - if (positions[i] > loadInfo.InputQueuePosition) - { - return $"P{i:D2} has input queue position {loadInfo.InputQueuePosition} which is {positions[(int)i] - loadInfo.InputQueuePosition} behind latest position {positions[i]}"; - } - } - - // finally, check if we have waited long enough - foreach (var kvp in loadInformation) - { - string latencyTrend = kvp.Value.LatencyTrend; - - if (!PartitionLoadInfo.IsLongIdle(latencyTrend)) - { - return $"P{kvp.Key:D2} had some activity recently, latency trend is {latencyTrend}"; - } - } - - // we have concluded that there are no pending work items, timers, or unprocessed input queue entries - return null; - } } } From c65fadfad7d2c7a2fd6eac386c60a4845af88ec9 Mon Sep 17 00:00:00 2001 From: sebastianburckhardt Date: Fri, 5 May 2023 10:35:48 -0700 Subject: [PATCH 02/13] port the scaling logic to compute a target instead of a scale recommendation. --- .../NetheriteTargetScaler.cs | 2 +- .../Scaling/ScalingMonitor.cs | 70 +++++++++++++++++++ 2 files changed, 71 insertions(+), 1 deletion(-) diff --git a/src/DurableTask.Netherite.AzureFunctions/NetheriteTargetScaler.cs b/src/DurableTask.Netherite.AzureFunctions/NetheriteTargetScaler.cs index 629043c..56117dc 100644 --- a/src/DurableTask.Netherite.AzureFunctions/NetheriteTargetScaler.cs +++ b/src/DurableTask.Netherite.AzureFunctions/NetheriteTargetScaler.cs @@ -32,7 +32,7 @@ public NetheriteTargetScaler( public Task GetScaleResultAsync(TargetScalerContext context) { - // Refer to GetScaleRecommendation() in ScaleMonitor + // Refer to GetTargetRecommendation() in ScaleMonitor throw new NotImplementedException(); } } diff --git a/src/DurableTask.Netherite/Scaling/ScalingMonitor.cs b/src/DurableTask.Netherite/Scaling/ScalingMonitor.cs index 4f31a41..8eb06e4 100644 --- a/src/DurableTask.Netherite/Scaling/ScalingMonitor.cs +++ b/src/DurableTask.Netherite/Scaling/ScalingMonitor.cs @@ -190,5 +190,75 @@ bool isSlowPartition(PartitionLoadInfo info) return new ScaleRecommendation(ScaleAction.None, keepWorkersAlive: true, reason: $"Partition latencies are healthy"); } } + + /// + /// Makes a target recommendation. + /// + /// The recommended number of workers. + public int GetTargetRecommendation(Metrics metrics, int maxConcurrentActivities, int maxConcurrentWorkItems) + { + if (metrics.TaskHubIsIdle) + { + return 0; // we need no workers + } + + int target = 1; // always need at least one worker when we are not idle + + // if there is a backlog of activities, ask for enough workers to process them + int activities = metrics.LoadInformation.Where(info => info.Value.IsLoaded()).Sum(info => info.Value.Activities); + if (activities > 0) + { + int requestedWorkers = (activities / maxConcurrentActivities) + 1; + requestedWorkers = Math.Min(requestedWorkers, metrics.LoadInformation.Count); // cannot use more workers than partitions + target = Math.Max(target, requestedWorkers); + } + + // if there are load-challenged partitions, ask for a worker for each of them + int numberOfChallengedPartitions = metrics.LoadInformation.Values + .Count(info => info.IsLoaded() || info.WorkItems > maxConcurrentWorkItems); + target = Math.Max(target, numberOfChallengedPartitions); + + // Determine how many different workers are currently running + int current = metrics.LoadInformation.Values.Select(info => info.WorkerId).Distinct().Count(); + + if (target < current) + { + // the target is lower than our current scale. However, before + // scaling in, we check some more things to avoid + // over-aggressive scale-in that could impact performance negatively. + + int numberOfNonIdlePartitions = metrics.LoadInformation.Values.Count(info => !PartitionLoadInfo.IsLongIdle(info.LatencyTrend)); + if (current > numberOfNonIdlePartitions) + { + // if we have more workers than non-idle partitions, don't immediately go lower than + // the number of non-idle partitions. + target = Math.Max(target, numberOfNonIdlePartitions); + } + else + { + // All partitions are busy, so so we don't want to reduce the worker count unless load is very low. + // Even if all partitions are runnning efficiently, it can be hard to know whether it is wise to reduce the worker count. + // We want to avoid scaling in unnecessarily when we've reached optimal scale-out. + // But we also want to avoid the case where a constant trickle of load after a big scale-out prevents scaling back in. + // To balance these goals, we vote to scale down only by one worker at a time when we see this situation. + bool allPartitionsAreFast = !metrics.LoadInformation.Values.Any( info => + info.LatencyTrend.Length != PartitionLoadInfo.LatencyTrendLength + || info.LatencyTrend.Any(c => c == PartitionLoadInfo.MediumLatency || c == PartitionLoadInfo.HighLatency)); + + if (allPartitionsAreFast) + { + // don't go lower than 1 below current + target = Math.Max(target, current - 1); + } + else + { + // don't go lower than current + target = Math.Max(target, current); + } + } + } + + return target; + } } } From bae7c8fcb009ed5123454998bedd7eef3894f485 Mon Sep 17 00:00:00 2001 From: Varshitha Bachu Date: Tue, 9 May 2023 09:35:45 -0700 Subject: [PATCH 03/13] moved target based scaling code to NetheriteTargetScaler --- ...urableTask.Netherite.AzureFunctions.csproj | 4 +- .../NetheriteTargetScaler.cs | 75 ++++++++++++++++++- .../DurableTask.Netherite.csproj | 2 +- .../Scaling/ScalingMonitor.cs | 70 ----------------- 4 files changed, 76 insertions(+), 75 deletions(-) diff --git a/src/DurableTask.Netherite.AzureFunctions/DurableTask.Netherite.AzureFunctions.csproj b/src/DurableTask.Netherite.AzureFunctions/DurableTask.Netherite.AzureFunctions.csproj index 23bbf5a..f63aa6f 100644 --- a/src/DurableTask.Netherite.AzureFunctions/DurableTask.Netherite.AzureFunctions.csproj +++ b/src/DurableTask.Netherite.AzureFunctions/DurableTask.Netherite.AzureFunctions.csproj @@ -51,8 +51,8 @@ - - + + diff --git a/src/DurableTask.Netherite.AzureFunctions/NetheriteTargetScaler.cs b/src/DurableTask.Netherite.AzureFunctions/NetheriteTargetScaler.cs index 56117dc..f3fa6ec 100644 --- a/src/DurableTask.Netherite.AzureFunctions/NetheriteTargetScaler.cs +++ b/src/DurableTask.Netherite.AzureFunctions/NetheriteTargetScaler.cs @@ -6,10 +6,12 @@ namespace DurableTask.Netherite.AzureFunctions { using System; + using System.Linq; using System.Threading.Tasks; using DurableTask.Netherite.Scaling; using Microsoft.Azure.WebJobs.Extensions.DurableTask; using Microsoft.Azure.WebJobs.Host.Scale; + using static DurableTask.Netherite.Scaling.ScalingMonitor; class NetheriteTargetScaler : ITargetScaler { @@ -30,10 +32,79 @@ public NetheriteTargetScaler( public TargetScalerDescriptor TargetScalerDescriptor { get; private set; } - public Task GetScaleResultAsync(TargetScalerContext context) + public async Task GetScaleResultAsync(TargetScalerContext context) { // Refer to GetTargetRecommendation() in ScaleMonitor - throw new NotImplementedException(); + Metrics metrics = await this.metricsProvider.GetMetricsAsync(); + + int maxConcurrentActivities = this.durabilityProvider.MaxConcurrentTaskActivityWorkItems; + int maxConcurrentWorkItems = this.durabilityProvider.MaxConcurrentTaskOrchestrationWorkItems; + + int target; + + if (metrics.TaskHubIsIdle) + { + target = 0; // we need no workers + } + + target = 1; // always need at least one worker when we are not idle + + // if there is a backlog of activities, ask for enough workers to process them + int activities = metrics.LoadInformation.Where(info => info.Value.IsLoaded()).Sum(info => info.Value.Activities); + if (activities > 0) + { + int requestedWorkers = (activities / maxConcurrentActivities) + 1; + requestedWorkers = Math.Min(requestedWorkers, metrics.LoadInformation.Count); // cannot use more workers than partitions + target = Math.Max(target, requestedWorkers); + } + + // if there are load-challenged partitions, ask for a worker for each of them + int numberOfChallengedPartitions = metrics.LoadInformation.Values + .Count(info => info.IsLoaded() || info.WorkItems > maxConcurrentWorkItems); + target = Math.Max(target, numberOfChallengedPartitions); + + // Determine how many different workers are currently running + int current = metrics.LoadInformation.Values.Select(info => info.WorkerId).Distinct().Count(); + + if (target < current) + { + // the target is lower than our current scale. However, before + // scaling in, we check some more things to avoid + // over-aggressive scale-in that could impact performance negatively. + + int numberOfNonIdlePartitions = metrics.LoadInformation.Values.Count(info => !PartitionLoadInfo.IsLongIdle(info.LatencyTrend)); + if (current > numberOfNonIdlePartitions) + { + // if we have more workers than non-idle partitions, don't immediately go lower than + // the number of non-idle partitions. + target = Math.Max(target, numberOfNonIdlePartitions); + } + else + { + // All partitions are busy, so so we don't want to reduce the worker count unless load is very low. + // Even if all partitions are runnning efficiently, it can be hard to know whether it is wise to reduce the worker count. + // We want to avoid scaling in unnecessarily when we've reached optimal scale-out. + // But we also want to avoid the case where a constant trickle of load after a big scale-out prevents scaling back in. + // To balance these goals, we vote to scale down only by one worker at a time when we see this situation. + bool allPartitionsAreFast = !metrics.LoadInformation.Values.Any(info => + info.LatencyTrend.Length != PartitionLoadInfo.LatencyTrendLength + || info.LatencyTrend.Any(c => c == PartitionLoadInfo.MediumLatency || c == PartitionLoadInfo.HighLatency)); + + if (allPartitionsAreFast) + { + // don't go lower than 1 below current + target = Math.Max(target, current - 1); + } + else + { + // don't go lower than current + target = Math.Max(target, current); + } + } + } + + this.scaleResult.TargetWorkerCount = target; + return this.scaleResult; } } } diff --git a/src/DurableTask.Netherite/DurableTask.Netherite.csproj b/src/DurableTask.Netherite/DurableTask.Netherite.csproj index 3bd42f1..2399485 100644 --- a/src/DurableTask.Netherite/DurableTask.Netherite.csproj +++ b/src/DurableTask.Netherite/DurableTask.Netherite.csproj @@ -57,7 +57,7 @@ - + diff --git a/src/DurableTask.Netherite/Scaling/ScalingMonitor.cs b/src/DurableTask.Netherite/Scaling/ScalingMonitor.cs index 8eb06e4..4f31a41 100644 --- a/src/DurableTask.Netherite/Scaling/ScalingMonitor.cs +++ b/src/DurableTask.Netherite/Scaling/ScalingMonitor.cs @@ -190,75 +190,5 @@ bool isSlowPartition(PartitionLoadInfo info) return new ScaleRecommendation(ScaleAction.None, keepWorkersAlive: true, reason: $"Partition latencies are healthy"); } } - - /// - /// Makes a target recommendation. - /// - /// The recommended number of workers. - public int GetTargetRecommendation(Metrics metrics, int maxConcurrentActivities, int maxConcurrentWorkItems) - { - if (metrics.TaskHubIsIdle) - { - return 0; // we need no workers - } - - int target = 1; // always need at least one worker when we are not idle - - // if there is a backlog of activities, ask for enough workers to process them - int activities = metrics.LoadInformation.Where(info => info.Value.IsLoaded()).Sum(info => info.Value.Activities); - if (activities > 0) - { - int requestedWorkers = (activities / maxConcurrentActivities) + 1; - requestedWorkers = Math.Min(requestedWorkers, metrics.LoadInformation.Count); // cannot use more workers than partitions - target = Math.Max(target, requestedWorkers); - } - - // if there are load-challenged partitions, ask for a worker for each of them - int numberOfChallengedPartitions = metrics.LoadInformation.Values - .Count(info => info.IsLoaded() || info.WorkItems > maxConcurrentWorkItems); - target = Math.Max(target, numberOfChallengedPartitions); - - // Determine how many different workers are currently running - int current = metrics.LoadInformation.Values.Select(info => info.WorkerId).Distinct().Count(); - - if (target < current) - { - // the target is lower than our current scale. However, before - // scaling in, we check some more things to avoid - // over-aggressive scale-in that could impact performance negatively. - - int numberOfNonIdlePartitions = metrics.LoadInformation.Values.Count(info => !PartitionLoadInfo.IsLongIdle(info.LatencyTrend)); - if (current > numberOfNonIdlePartitions) - { - // if we have more workers than non-idle partitions, don't immediately go lower than - // the number of non-idle partitions. - target = Math.Max(target, numberOfNonIdlePartitions); - } - else - { - // All partitions are busy, so so we don't want to reduce the worker count unless load is very low. - // Even if all partitions are runnning efficiently, it can be hard to know whether it is wise to reduce the worker count. - // We want to avoid scaling in unnecessarily when we've reached optimal scale-out. - // But we also want to avoid the case where a constant trickle of load after a big scale-out prevents scaling back in. - // To balance these goals, we vote to scale down only by one worker at a time when we see this situation. - bool allPartitionsAreFast = !metrics.LoadInformation.Values.Any( info => - info.LatencyTrend.Length != PartitionLoadInfo.LatencyTrendLength - || info.LatencyTrend.Any(c => c == PartitionLoadInfo.MediumLatency || c == PartitionLoadInfo.HighLatency)); - - if (allPartitionsAreFast) - { - // don't go lower than 1 below current - target = Math.Max(target, current - 1); - } - else - { - // don't go lower than current - target = Math.Max(target, current); - } - } - } - - return target; - } } } From 583d2de97e29ed6f17a8827024908c8114310697 Mon Sep 17 00:00:00 2001 From: Varshitha Bachu Date: Wed, 17 May 2023 11:27:13 -0700 Subject: [PATCH 04/13] adding tests --- .../NetheriteTargetScaler.cs | 3 +- .../Scaling/NetheriteMetricsProvider.cs | 2 +- ...Task.Netherite.AzureFunctions.Tests.csproj | 3 +- .../TargetBasedScalingTests.cs | 100 ++++++++++++++++++ 4 files changed, 104 insertions(+), 4 deletions(-) create mode 100644 test/DurableTask.Netherite.AzureFunctions.Tests/TargetBasedScalingTests.cs diff --git a/src/DurableTask.Netherite.AzureFunctions/NetheriteTargetScaler.cs b/src/DurableTask.Netherite.AzureFunctions/NetheriteTargetScaler.cs index f3fa6ec..b76c5d7 100644 --- a/src/DurableTask.Netherite.AzureFunctions/NetheriteTargetScaler.cs +++ b/src/DurableTask.Netherite.AzureFunctions/NetheriteTargetScaler.cs @@ -13,7 +13,7 @@ namespace DurableTask.Netherite.AzureFunctions using Microsoft.Azure.WebJobs.Host.Scale; using static DurableTask.Netherite.Scaling.ScalingMonitor; - class NetheriteTargetScaler : ITargetScaler + public class NetheriteTargetScaler : ITargetScaler { readonly NetheriteMetricsProvider metricsProvider; readonly DurabilityProvider durabilityProvider; @@ -34,7 +34,6 @@ public NetheriteTargetScaler( public async Task GetScaleResultAsync(TargetScalerContext context) { - // Refer to GetTargetRecommendation() in ScaleMonitor Metrics metrics = await this.metricsProvider.GetMetricsAsync(); int maxConcurrentActivities = this.durabilityProvider.MaxConcurrentTaskActivityWorkItems; diff --git a/src/DurableTask.Netherite/Scaling/NetheriteMetricsProvider.cs b/src/DurableTask.Netherite/Scaling/NetheriteMetricsProvider.cs index b3d793a..c2dc077 100644 --- a/src/DurableTask.Netherite/Scaling/NetheriteMetricsProvider.cs +++ b/src/DurableTask.Netherite/Scaling/NetheriteMetricsProvider.cs @@ -23,7 +23,7 @@ public NetheriteMetricsProvider( this.eventHubsConnection = eventHubsConnection; } - public async Task GetMetricsAsync() + public virtual async Task GetMetricsAsync() { DateTime now = DateTime.UtcNow; var loadInformation = await this.loadPublisher.QueryAsync(CancellationToken.None).ConfigureAwait(false); diff --git a/test/DurableTask.Netherite.AzureFunctions.Tests/DurableTask.Netherite.AzureFunctions.Tests.csproj b/test/DurableTask.Netherite.AzureFunctions.Tests/DurableTask.Netherite.AzureFunctions.Tests.csproj index cd5d434..a1182e7 100644 --- a/test/DurableTask.Netherite.AzureFunctions.Tests/DurableTask.Netherite.AzureFunctions.Tests.csproj +++ b/test/DurableTask.Netherite.AzureFunctions.Tests/DurableTask.Netherite.AzureFunctions.Tests.csproj @@ -10,6 +10,7 @@ + all @@ -24,7 +25,7 @@ - + diff --git a/test/DurableTask.Netherite.AzureFunctions.Tests/TargetBasedScalingTests.cs b/test/DurableTask.Netherite.AzureFunctions.Tests/TargetBasedScalingTests.cs new file mode 100644 index 0000000..9ed1d9d --- /dev/null +++ b/test/DurableTask.Netherite.AzureFunctions.Tests/TargetBasedScalingTests.cs @@ -0,0 +1,100 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace DurableTask.Netherite.AzureFunctions.Tests +{ + using System; + using System.Collections.Generic; + using DurableTask.Core; + using DurableTask.Netherite.Scaling; + using DurableTask.Netherite.Tests; + using Microsoft.Azure.WebJobs.Extensions.DurableTask; + using Microsoft.Azure.WebJobs.Host.Scale; + using Microsoft.Extensions.Logging; + using Moq; + using Xunit; + using Xunit.Abstractions; + using static DurableTask.Netherite.Scaling.ScalingMonitor; + + public class TargetBasedScalingTests : IntegrationTestBase + { + readonly ILoggerFactory loggerFactory; + readonly Mock metricsProviderMock; + readonly Mock orchestrationServiceMock; + + public TargetBasedScalingTests(ITestOutputHelper output) : base(output) + { + this.loggerFactory = new LoggerFactory(); + var loggerProvider = new XunitLoggerProvider(); + this.loggerFactory.AddProvider(loggerProvider); + + this.orchestrationServiceMock = new Mock(MockBehavior.Strict); + + this.metricsProviderMock = new Mock( + MockBehavior.Strict, + null, + new ConnectionInfo()); + } + + [Theory] + [InlineData(10, 10, 2)] + [InlineData(20, 20, 2)] + public async void GetDurabilityProviderFactoryTest(int maxConcurrentTaskActivityWorkItems, int maxConcurrentTaskOrchestrationWorkItems, int expectedTargetWorkerCount) + { + this.orchestrationServiceMock.Setup(m => m.MaxConcurrentTaskActivityWorkItems).Returns(maxConcurrentTaskActivityWorkItems); + this.orchestrationServiceMock.Setup(m => m.MaxConcurrentTaskOrchestrationWorkItems).Returns(maxConcurrentTaskOrchestrationWorkItems); + + Dictionary loadInformation = new Dictionary() + { + { 1, this.Create("A") }, + { 2, this.Create("B") }, + }; + + var testMetrics = new Metrics() + { + LoadInformation = loadInformation, + Busy = "busy", + Timestamp = DateTime.UtcNow, + }; + + var durabilityProviderMock = new Mock( + MockBehavior.Strict, + "storageProviderName", + this.orchestrationServiceMock.Object, + new Mock().Object, + "connectionName"); + + this.metricsProviderMock.Setup(m => m.GetMetricsAsync()).ReturnsAsync(testMetrics); + + NetheriteTargetScaler targetScaler = new NetheriteTargetScaler( + "functionId", + this.metricsProviderMock.Object, + durabilityProviderMock.Object); + + TargetScalerResult result = await targetScaler.GetScaleResultAsync(new TargetScalerContext()); + + Assert.Equal(expectedTargetWorkerCount, result.TargetWorkerCount); + } + + PartitionLoadInfo Create(string worker) + { + return new PartitionLoadInfo() + { + WorkerId = worker, + Activities = 11, + CacheMB = 1.1, + CachePct = 33, + CommitLogPosition = 64, + InputQueuePosition = 1231, + Instances = 3, + LatencyTrend = "IIIII", + MissRate = 0.1, + Outbox = 44, + Requests = 55, + Timers = 66, + Wakeup = DateTime.Parse("2022-10-08T17:00:44.7400082Z").ToUniversalTime(), + WorkItems = 77 + }; + } + } +} From 91ff2ae670ff470313902105773754c813e8b32f Mon Sep 17 00:00:00 2001 From: Varshitha Bachu Date: Wed, 17 May 2023 11:34:19 -0700 Subject: [PATCH 05/13] updated test name --- .../TargetBasedScalingTests.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/DurableTask.Netherite.AzureFunctions.Tests/TargetBasedScalingTests.cs b/test/DurableTask.Netherite.AzureFunctions.Tests/TargetBasedScalingTests.cs index 9ed1d9d..3034733 100644 --- a/test/DurableTask.Netherite.AzureFunctions.Tests/TargetBasedScalingTests.cs +++ b/test/DurableTask.Netherite.AzureFunctions.Tests/TargetBasedScalingTests.cs @@ -39,12 +39,12 @@ public TargetBasedScalingTests(ITestOutputHelper output) : base(output) [Theory] [InlineData(10, 10, 2)] [InlineData(20, 20, 2)] - public async void GetDurabilityProviderFactoryTest(int maxConcurrentTaskActivityWorkItems, int maxConcurrentTaskOrchestrationWorkItems, int expectedTargetWorkerCount) + public async void TargetBasedScalingTest(int maxConcurrentTaskActivityWorkItems, int maxConcurrentTaskOrchestrationWorkItems, int expectedTargetWorkerCount) { this.orchestrationServiceMock.Setup(m => m.MaxConcurrentTaskActivityWorkItems).Returns(maxConcurrentTaskActivityWorkItems); this.orchestrationServiceMock.Setup(m => m.MaxConcurrentTaskOrchestrationWorkItems).Returns(maxConcurrentTaskOrchestrationWorkItems); - Dictionary loadInformation = new Dictionary() + var loadInformation = new Dictionary() { { 1, this.Create("A") }, { 2, this.Create("B") }, From 53023d34f1cd7cfc8919605cf2caf5cd8262f0be Mon Sep 17 00:00:00 2001 From: David Justo Date: Thu, 18 May 2023 17:24:10 -0700 Subject: [PATCH 06/13] add instanceId to descriptor (#270) --- .../NetheriteProvider.cs | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/src/DurableTask.Netherite.AzureFunctions/NetheriteProvider.cs b/src/DurableTask.Netherite.AzureFunctions/NetheriteProvider.cs index ae04a3d..3556ea2 100644 --- a/src/DurableTask.Netherite.AzureFunctions/NetheriteProvider.cs +++ b/src/DurableTask.Netherite.AzureFunctions/NetheriteProvider.cs @@ -119,7 +119,7 @@ public override bool TryGetScaleMonitor( { if (this.Service.TryGetScalingMonitor(out var monitor)) { - scaleMonitor = new ScaleMonitor(monitor); + scaleMonitor = new ScaleMonitor(monitor, functionId); monitor.InformationTracer($"ScaleMonitor Constructed, Descriptor.Id={scaleMonitor.Descriptor.Id}"); return true; } @@ -159,10 +159,16 @@ class ScaleMonitor : IScaleMonitor readonly DataContractSerializer serializer = new DataContractSerializer(typeof(ScalingMonitor.Metrics)); static Tuple cachedMetrics; - public ScaleMonitor(ScalingMonitor scalingMonitor) + public ScaleMonitor(ScalingMonitor scalingMonitor, string functionId) { this.scalingMonitor = scalingMonitor; - this.descriptor = new ScaleMonitorDescriptor($"DurableTaskTrigger-Netherite-{this.scalingMonitor.TaskHubName}".ToLower()); + var descriptorId = $"DurableTaskTrigger-Netherite-{this.scalingMonitor.TaskHubName}".ToLower(); + +#if NETCOREAPP3_1_OR_GREATER + this.descriptor = new ScaleMonitorDescriptor(descriptorId, functionId); +#else + this.descriptor = new ScaleMonitorDescriptor(descriptorId); +#endif } public ScaleMonitorDescriptor Descriptor => this.descriptor; From 24fba84ce8ea4f5163753fa41d3d94afe417ff2e Mon Sep 17 00:00:00 2001 From: "naiyuantian@microsoft.com" Date: Tue, 11 Feb 2025 15:54:33 -0800 Subject: [PATCH 07/13] make netherite metrics provider singleton --- .../NetheriteProvider.cs | 13 +++++++--- .../NetheriteOrchestrationService.cs | 2 +- .../Scaling/NetheriteMetricsProvider.cs | 26 ++++++++++++++----- 3 files changed, 29 insertions(+), 12 deletions(-) diff --git a/src/DurableTask.Netherite.AzureFunctions/NetheriteProvider.cs b/src/DurableTask.Netherite.AzureFunctions/NetheriteProvider.cs index 057b5e3..2fcfec2 100644 --- a/src/DurableTask.Netherite.AzureFunctions/NetheriteProvider.cs +++ b/src/DurableTask.Netherite.AzureFunctions/NetheriteProvider.cs @@ -50,6 +50,8 @@ public NetheriteProvider( public override string EventSourceName => "DurableTask-Netherite"; + NetheriteMetricsProvider metricsProvider; + /// public async override Task RetrieveSerializedEntityState(EntityId entityId, JsonSerializerSettings serializerSettings) { @@ -130,7 +132,6 @@ public override bool TryGetScaleMonitor( } } -#if !NETSTANDARD public override bool TryGetTargetScaler( string functionId, string functionName, @@ -139,13 +140,17 @@ public override bool TryGetTargetScaler( out ITargetScaler targetScaler) { ILoadPublisherService loadPublisher = this.Service.GetLoadPublisher(); - NetheriteMetricsProvider metricsProvider = this.Service.GetNetheriteMetricsProvider(loadPublisher, this.Settings.EventHubsConnection); - targetScaler = new NetheriteTargetScaler(functionId, metricsProvider, this); + // Target Scaler is created per function id. And they share the same NetheriteMetricsProvider. + if ( this.metricsProvider == null) + { + this.metricsProvider = this.Service.GetNetheriteMetricsProvider(loadPublisher, this.Settings.EventHubsConnection); + } + + targetScaler = new NetheriteTargetScaler(functionId, this.metricsProvider, this); return true; } -#endif public class NetheriteScaleMetrics : ScaleMetrics { diff --git a/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationService.cs b/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationService.cs index d5ce6f5..4fc5081 100644 --- a/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationService.cs +++ b/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationService.cs @@ -255,7 +255,7 @@ public bool TryGetScalingMonitor(out ScalingMonitor monitor) internal ILoadPublisherService GetLoadPublisher() { return string.IsNullOrEmpty(this.Settings.LoadInformationAzureTableName) ? - new AzureBlobLoadPublisher(this.Settings.BlobStorageConnection, this.Settings.HubName) + new AzureBlobLoadPublisher(this.Settings.BlobStorageConnection, this.Settings.HubName, this.Settings.TaskhubParametersFilePath) : new AzureTableLoadPublisher(this.Settings.TableStorageConnection, this.Settings.LoadInformationAzureTableName, this.Settings.HubName); } diff --git a/src/DurableTask.Netherite/Scaling/NetheriteMetricsProvider.cs b/src/DurableTask.Netherite/Scaling/NetheriteMetricsProvider.cs index c2dc077..bd052d4 100644 --- a/src/DurableTask.Netherite/Scaling/NetheriteMetricsProvider.cs +++ b/src/DurableTask.Netherite/Scaling/NetheriteMetricsProvider.cs @@ -15,6 +15,9 @@ public class NetheriteMetricsProvider readonly ILoadPublisherService loadPublisher; readonly ConnectionInfo eventHubsConnection; + DateTime lastMetricsQueryTime = DateTime.MinValue; + Metrics metrics = default; + public NetheriteMetricsProvider( ILoadPublisherService loadPublisher, ConnectionInfo eventHubsConnection) @@ -26,15 +29,24 @@ public NetheriteMetricsProvider( public virtual async Task GetMetricsAsync() { DateTime now = DateTime.UtcNow; - var loadInformation = await this.loadPublisher.QueryAsync(CancellationToken.None).ConfigureAwait(false); - var busy = await this.TaskHubIsIdleAsync(loadInformation).ConfigureAwait(false); - return new Metrics() + // Collect the metrics every 5 seconds to avoid excessive poling. + // If calling this method more frequently, return the cached metrics. + if ( now >= this.lastMetricsQueryTime.AddSeconds(5)) { - LoadInformation = loadInformation, - Busy = busy, - Timestamp = now, - }; + var loadInformation = await this.loadPublisher.QueryAsync(CancellationToken.None).ConfigureAwait(false); + var busy = await this.TaskHubIsIdleAsync(loadInformation).ConfigureAwait(false); + + this.lastMetricsQueryTime = now; + this.metrics = new Metrics() + { + LoadInformation = loadInformation, + Busy = busy, + Timestamp = now, + }; + } + + return this.metrics; } /// From ea07ee0cc70eb0e907a4474c02230512ae22653c Mon Sep 17 00:00:00 2001 From: "naiyuantian@microsoft.com" Date: Tue, 11 Feb 2025 16:36:50 -0800 Subject: [PATCH 08/13] add deleted package reference --- .../DurableTask.Netherite.Tests.csproj | 1 + 1 file changed, 1 insertion(+) diff --git a/test/DurableTask.Netherite.Tests/DurableTask.Netherite.Tests.csproj b/test/DurableTask.Netherite.Tests/DurableTask.Netherite.Tests.csproj index 9c06deb..de6eea9 100644 --- a/test/DurableTask.Netherite.Tests/DurableTask.Netherite.Tests.csproj +++ b/test/DurableTask.Netherite.Tests/DurableTask.Netherite.Tests.csproj @@ -10,6 +10,7 @@ + all From 79c67232bb95ff87e226956a4ba3cda57c812772 Mon Sep 17 00:00:00 2001 From: Sebastian Burckhardt Date: Thu, 13 Feb 2025 12:00:52 -0800 Subject: [PATCH 09/13] add unit tests for target based scaling, and fix 2 bugs found by tests (#446) --- .../NetheriteTargetScaler.cs | 5 +- ...Task.Netherite.AzureFunctions.Tests.csproj | 2 +- .../TargetBasedScalingTests.cs | 153 ++++++++++++++---- 3 files changed, 129 insertions(+), 31 deletions(-) diff --git a/src/DurableTask.Netherite.AzureFunctions/NetheriteTargetScaler.cs b/src/DurableTask.Netherite.AzureFunctions/NetheriteTargetScaler.cs index b76c5d7..9aab940 100644 --- a/src/DurableTask.Netherite.AzureFunctions/NetheriteTargetScaler.cs +++ b/src/DurableTask.Netherite.AzureFunctions/NetheriteTargetScaler.cs @@ -43,7 +43,8 @@ public async Task GetScaleResultAsync(TargetScalerContext co if (metrics.TaskHubIsIdle) { - target = 0; // we need no workers + this.scaleResult.TargetWorkerCount = 0; // we need no workers + return this.scaleResult; } target = 1; // always need at least one worker when we are not idle @@ -52,7 +53,7 @@ public async Task GetScaleResultAsync(TargetScalerContext co int activities = metrics.LoadInformation.Where(info => info.Value.IsLoaded()).Sum(info => info.Value.Activities); if (activities > 0) { - int requestedWorkers = (activities / maxConcurrentActivities) + 1; + int requestedWorkers = (activities + (maxConcurrentActivities - 1)) / maxConcurrentActivities; // rounded-up integer division requestedWorkers = Math.Min(requestedWorkers, metrics.LoadInformation.Count); // cannot use more workers than partitions target = Math.Max(target, requestedWorkers); } diff --git a/test/DurableTask.Netherite.AzureFunctions.Tests/DurableTask.Netherite.AzureFunctions.Tests.csproj b/test/DurableTask.Netherite.AzureFunctions.Tests/DurableTask.Netherite.AzureFunctions.Tests.csproj index 97adcda..a506dd5 100644 --- a/test/DurableTask.Netherite.AzureFunctions.Tests/DurableTask.Netherite.AzureFunctions.Tests.csproj +++ b/test/DurableTask.Netherite.AzureFunctions.Tests/DurableTask.Netherite.AzureFunctions.Tests.csproj @@ -4,7 +4,7 @@ net6.0 false true - 8.0 + 12.0 ..\..\sign.snk diff --git a/test/DurableTask.Netherite.AzureFunctions.Tests/TargetBasedScalingTests.cs b/test/DurableTask.Netherite.AzureFunctions.Tests/TargetBasedScalingTests.cs index 3034733..2dee395 100644 --- a/test/DurableTask.Netherite.AzureFunctions.Tests/TargetBasedScalingTests.cs +++ b/test/DurableTask.Netherite.AzureFunctions.Tests/TargetBasedScalingTests.cs @@ -5,6 +5,7 @@ namespace DurableTask.Netherite.AzureFunctions.Tests { using System; using System.Collections.Generic; + using System.Threading.Tasks; using DurableTask.Core; using DurableTask.Netherite.Scaling; using DurableTask.Netherite.Tests; @@ -37,23 +38,28 @@ public TargetBasedScalingTests(ITestOutputHelper output) : base(output) } [Theory] - [InlineData(10, 10, 2)] - [InlineData(20, 20, 2)] - public async void TargetBasedScalingTest(int maxConcurrentTaskActivityWorkItems, int maxConcurrentTaskOrchestrationWorkItems, int expectedTargetWorkerCount) + [MemberData(nameof(Tests))] + public async Task TargetBasedScalingTest(TestData testData) { - this.orchestrationServiceMock.Setup(m => m.MaxConcurrentTaskActivityWorkItems).Returns(maxConcurrentTaskActivityWorkItems); - this.orchestrationServiceMock.Setup(m => m.MaxConcurrentTaskOrchestrationWorkItems).Returns(maxConcurrentTaskOrchestrationWorkItems); + this.orchestrationServiceMock.Setup(m => m.MaxConcurrentTaskActivityWorkItems).Returns(testData.MaxA); + this.orchestrationServiceMock.Setup(m => m.MaxConcurrentTaskOrchestrationWorkItems).Returns(testData.MaxO); - var loadInformation = new Dictionary() + if (testData.Current > testData.LoadInfos.Count) { - { 1, this.Create("A") }, - { 2, this.Create("B") }, - }; + throw new ArgumentException("invalid test parameter", nameof(testData.Current)); + } + var loadInformation = new Dictionary(); + for (int i = 0; i < testData.LoadInfos.Count; i++) + { + testData.LoadInfos[i].WorkerId = $"worker{Math.Min(i, testData.Current - 1)}"; + loadInformation.Add((uint)i, testData.LoadInfos[i]); + }; + var testMetrics = new Metrics() { LoadInformation = loadInformation, - Busy = "busy", + Busy = testData.Busy, Timestamp = DateTime.UtcNow, }; @@ -73,28 +79,119 @@ public async void TargetBasedScalingTest(int maxConcurrentTaskActivityWorkItems, TargetScalerResult result = await targetScaler.GetScaleResultAsync(new TargetScalerContext()); - Assert.Equal(expectedTargetWorkerCount, result.TargetWorkerCount); + Assert.Equal(testData.Expected, result.TargetWorkerCount); } - PartitionLoadInfo Create(string worker) + public record TestData( + int Expected, + List LoadInfos, + string Busy = "busy", + int MaxA = 10, + int MaxO = 10, + int Current = 1) { - return new PartitionLoadInfo() + } + + public static TheoryData Tests + { + get { - WorkerId = worker, - Activities = 11, - CacheMB = 1.1, - CachePct = 33, - CommitLogPosition = 64, - InputQueuePosition = 1231, - Instances = 3, - LatencyTrend = "IIIII", - MissRate = 0.1, - Outbox = 44, - Requests = 55, - Timers = 66, - Wakeup = DateTime.Parse("2022-10-08T17:00:44.7400082Z").ToUniversalTime(), - WorkItems = 77 - }; + var data = new TheoryData(); + + // if "Busy" is null, the task hub is idle and the expected target is zero + data.Add(new TestData(Expected: 0, [ Idle, FastForAWhile, NowSlow ], Busy: null)); + + // with backlog of activities, target is set to the number of activities divided by the max activities per worker (but capped by partition count) + data.Add(new TestData(Expected: 1, [Act_5000, Idle, Idle], MaxA: 5000)); + data.Add(new TestData(Expected: 1, [Act_4999, Idle, Idle], MaxA: 5000)); + data.Add(new TestData(Expected: 2, [Act_5001, Idle, Idle], MaxA: 5000)); + data.Add(new TestData(Expected: 10, [Act_5000, Idle, FastOnlyNow, Act_5000, Idle, Idle, Idle, Idle, Idle, Idle, Idle, Idle, Idle, Idle, Idle], MaxA: 1000)); + data.Add(new TestData(Expected: 6, [Act_5000, Idle, FastOnlyNow, Act_5000, Idle, Idle], MaxA: 1000)); + + // with load-challenged partitions, target is at least the number of challenged partitions + data.Add(new TestData(Expected: 4, [Act_5000, Act_5000, Act_5000, Act_5000, Idle, Idle], MaxA: 50000)); + data.Add(new TestData(Expected: 4, [Act_5000, Act_5000, NowSlow, NowVerySlow, FastOnlyNow, FastForAWhile], MaxA: 50000)); + data.Add(new TestData(Expected: 4, [Act_5000, Act_5000, NowSlow, NowVerySlow, WasSlow, Partial], MaxA: 50000)); + data.Add(new TestData(Expected: 4, [Act_5000, Act_5000, NowSlow, Orch_5000, FastOnlyNow, AlmostIdle], MaxA: 50000)); + data.Add(new TestData(Expected: 4, [Act_5000, Act_5000, NowSlow, NowVerySlow, Orch_5000, WasSlow], MaxA: 50000, MaxO: 50000)); + + // scale down: if current is above non-idle partitions, scale down to non-idle partitions + data.Add(new TestData(Expected: 5, [Act_5000, Act_5000, AlmostIdle, AlmostIdle, AlmostIdle, Idle], Current: 6, MaxA: 5000)); + data.Add(new TestData(Expected: 4, [Act_5000, Act_5000, Partial, Partial, Idle, Idle], Current: 6, MaxA: 5000)); + + // scale down below non-idle partitions: dont if some partitions are incomplete or show any slowness; otherwise by 1 + data.Add(new TestData(Expected: 4, [FastForAWhile, FastForAWhile, AlmostIdle, Partial], Current: 4)); + data.Add(new TestData(Expected: 4, [FastForAWhile, FastForAWhile, AlmostIdle, FastOnlyNow], Current: 4)); + data.Add(new TestData(Expected: 4, [FastForAWhile, FastForAWhile, AlmostIdle, WasSlow], Current: 4)); + data.Add(new TestData(Expected: 3, [FastForAWhile, FastForAWhile, AlmostIdle, AlmostIdle], Current: 4)); + data.Add(new TestData(Expected: 3, [FastForAWhile, FastForAWhile, AlmostIdle, FastForAWhile], Current: 4)); + + return data; + } } + + static PartitionLoadInfo Idle => new PartitionLoadInfo() + { + LatencyTrend = "IIIII", + }; + + static PartitionLoadInfo FastOnlyNow => new PartitionLoadInfo() + { + LatencyTrend = "MMHML", + }; + + static PartitionLoadInfo FastForAWhile => new PartitionLoadInfo() + { + LatencyTrend = "LLLLL", + }; + + static PartitionLoadInfo Partial => new PartitionLoadInfo() + { + LatencyTrend = "I", + }; + + static PartitionLoadInfo NowSlow => new PartitionLoadInfo() + { + LatencyTrend = "IIIIM", + }; + + static PartitionLoadInfo NowVerySlow => new PartitionLoadInfo() + { + LatencyTrend = "IIIIH", + }; + + static PartitionLoadInfo WasSlow => new PartitionLoadInfo() + { + LatencyTrend = "MIIII", + }; + + static PartitionLoadInfo AlmostIdle => new PartitionLoadInfo() + { + LatencyTrend = "LIIII", + }; + + static PartitionLoadInfo Act_5000 => new PartitionLoadInfo() + { + Activities = 5000, + LatencyTrend = "MMMMM", + }; + + static PartitionLoadInfo Act_4999 => new PartitionLoadInfo() + { + Activities = 4999, + LatencyTrend = "MMMMM", + }; + + static PartitionLoadInfo Act_5001 => new PartitionLoadInfo() + { + Activities = 5001, + LatencyTrend = "MMMMM", + }; + + static PartitionLoadInfo Orch_5000 => new PartitionLoadInfo() + { + WorkItems = 5000, + LatencyTrend = "IIIII", + }; } } From add0518d7d5286e6771e4f1470c21b217ca95a9c Mon Sep 17 00:00:00 2001 From: Naiyuan Tian <110135109+nytian@users.noreply.github.com> Date: Thu, 13 Feb 2025 12:18:05 -0800 Subject: [PATCH 10/13] increase version to 3.1.0 --- src/common.props | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/common.props b/src/common.props index 8a22e78..b0ad35e 100644 --- a/src/common.props +++ b/src/common.props @@ -3,7 +3,7 @@ 3 - 0 + 1 0 $(MajorVersion).$(MinorVersion).$(PatchVersion) @@ -11,4 +11,4 @@ .$(GITHUB_RUN_NUMBER) $(VersionPrefix)$(BuildSuffix) - \ No newline at end of file + From 8efde4e96647b169f1d405f9d05b4253baa03ea6 Mon Sep 17 00:00:00 2001 From: "naiyuantian@microsoft.com" Date: Thu, 13 Feb 2025 12:35:11 -0800 Subject: [PATCH 11/13] update index --- .../DurableTask.Netherite.AzureFunctions.Tests.csproj | 6 ++++-- .../DurableTask.Netherite.Tests.csproj | 2 +- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/test/DurableTask.Netherite.AzureFunctions.Tests/DurableTask.Netherite.AzureFunctions.Tests.csproj b/test/DurableTask.Netherite.AzureFunctions.Tests/DurableTask.Netherite.AzureFunctions.Tests.csproj index a506dd5..2e33f4d 100644 --- a/test/DurableTask.Netherite.AzureFunctions.Tests/DurableTask.Netherite.AzureFunctions.Tests.csproj +++ b/test/DurableTask.Netherite.AzureFunctions.Tests/DurableTask.Netherite.AzureFunctions.Tests.csproj @@ -21,10 +21,12 @@ + + + + - - diff --git a/test/DurableTask.Netherite.Tests/DurableTask.Netherite.Tests.csproj b/test/DurableTask.Netherite.Tests/DurableTask.Netherite.Tests.csproj index de6eea9..943dbac 100644 --- a/test/DurableTask.Netherite.Tests/DurableTask.Netherite.Tests.csproj +++ b/test/DurableTask.Netherite.Tests/DurableTask.Netherite.Tests.csproj @@ -10,7 +10,7 @@ - + all From 6b4a5a9f2bb0fa496c8ffe849752d193eb49abae Mon Sep 17 00:00:00 2001 From: "naiyuantian@microsoft.com" Date: Thu, 13 Feb 2025 12:36:38 -0800 Subject: [PATCH 12/13] add empty line --- .../DurableTask.Netherite.AzureFunctions.Tests.csproj | 1 + 1 file changed, 1 insertion(+) diff --git a/test/DurableTask.Netherite.AzureFunctions.Tests/DurableTask.Netherite.AzureFunctions.Tests.csproj b/test/DurableTask.Netherite.AzureFunctions.Tests/DurableTask.Netherite.AzureFunctions.Tests.csproj index 2e33f4d..4428f4b 100644 --- a/test/DurableTask.Netherite.AzureFunctions.Tests/DurableTask.Netherite.AzureFunctions.Tests.csproj +++ b/test/DurableTask.Netherite.AzureFunctions.Tests/DurableTask.Netherite.AzureFunctions.Tests.csproj @@ -29,4 +29,5 @@ + From cfe7249eb35287a5c45e22c98efa01a8321df26e Mon Sep 17 00:00:00 2001 From: "naiyuantian@microsoft.com" Date: Fri, 14 Feb 2025 11:21:05 -0800 Subject: [PATCH 13/13] update by comments make loadpublisher class property --- .../NetheriteProvider.cs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/DurableTask.Netherite.AzureFunctions/NetheriteProvider.cs b/src/DurableTask.Netherite.AzureFunctions/NetheriteProvider.cs index 2fcfec2..f668bcc 100644 --- a/src/DurableTask.Netherite.AzureFunctions/NetheriteProvider.cs +++ b/src/DurableTask.Netherite.AzureFunctions/NetheriteProvider.cs @@ -51,6 +51,7 @@ public NetheriteProvider( public override string EventSourceName => "DurableTask-Netherite"; NetheriteMetricsProvider metricsProvider; + ILoadPublisherService loadPublisher; /// public async override Task RetrieveSerializedEntityState(EntityId entityId, JsonSerializerSettings serializerSettings) @@ -138,13 +139,12 @@ public override bool TryGetTargetScaler( string hubName, string connectionName, out ITargetScaler targetScaler) - { - ILoadPublisherService loadPublisher = this.Service.GetLoadPublisher(); - + { // Target Scaler is created per function id. And they share the same NetheriteMetricsProvider. if ( this.metricsProvider == null) { - this.metricsProvider = this.Service.GetNetheriteMetricsProvider(loadPublisher, this.Settings.EventHubsConnection); + this.loadPublisher ??= this.Service.GetLoadPublisher(); + this.metricsProvider = this.Service.GetNetheriteMetricsProvider(this.loadPublisher, this.Settings.EventHubsConnection); } targetScaler = new NetheriteTargetScaler(functionId, this.metricsProvider, this);