diff --git a/src/DurableTask.Netherite.AzureFunctions/NetheriteTargetScaler.cs b/src/DurableTask.Netherite.AzureFunctions/NetheriteTargetScaler.cs index b76c5d75..9aab9405 100644 --- a/src/DurableTask.Netherite.AzureFunctions/NetheriteTargetScaler.cs +++ b/src/DurableTask.Netherite.AzureFunctions/NetheriteTargetScaler.cs @@ -43,7 +43,8 @@ public async Task GetScaleResultAsync(TargetScalerContext co if (metrics.TaskHubIsIdle) { - target = 0; // we need no workers + this.scaleResult.TargetWorkerCount = 0; // we need no workers + return this.scaleResult; } target = 1; // always need at least one worker when we are not idle @@ -52,7 +53,7 @@ public async Task GetScaleResultAsync(TargetScalerContext co int activities = metrics.LoadInformation.Where(info => info.Value.IsLoaded()).Sum(info => info.Value.Activities); if (activities > 0) { - int requestedWorkers = (activities / maxConcurrentActivities) + 1; + int requestedWorkers = (activities + (maxConcurrentActivities - 1)) / maxConcurrentActivities; // rounded-up integer division requestedWorkers = Math.Min(requestedWorkers, metrics.LoadInformation.Count); // cannot use more workers than partitions target = Math.Max(target, requestedWorkers); } diff --git a/test/DurableTask.Netherite.AzureFunctions.Tests/DurableTask.Netherite.AzureFunctions.Tests.csproj b/test/DurableTask.Netherite.AzureFunctions.Tests/DurableTask.Netherite.AzureFunctions.Tests.csproj index 97adcdaf..a506dd5c 100644 --- a/test/DurableTask.Netherite.AzureFunctions.Tests/DurableTask.Netherite.AzureFunctions.Tests.csproj +++ b/test/DurableTask.Netherite.AzureFunctions.Tests/DurableTask.Netherite.AzureFunctions.Tests.csproj @@ -4,7 +4,7 @@ net6.0 false true - 8.0 + 12.0 ..\..\sign.snk diff --git a/test/DurableTask.Netherite.AzureFunctions.Tests/TargetBasedScalingTests.cs b/test/DurableTask.Netherite.AzureFunctions.Tests/TargetBasedScalingTests.cs index 30347336..2dee395e 100644 --- a/test/DurableTask.Netherite.AzureFunctions.Tests/TargetBasedScalingTests.cs +++ b/test/DurableTask.Netherite.AzureFunctions.Tests/TargetBasedScalingTests.cs @@ -5,6 +5,7 @@ namespace DurableTask.Netherite.AzureFunctions.Tests { using System; using System.Collections.Generic; + using System.Threading.Tasks; using DurableTask.Core; using DurableTask.Netherite.Scaling; using DurableTask.Netherite.Tests; @@ -37,23 +38,28 @@ public TargetBasedScalingTests(ITestOutputHelper output) : base(output) } [Theory] - [InlineData(10, 10, 2)] - [InlineData(20, 20, 2)] - public async void TargetBasedScalingTest(int maxConcurrentTaskActivityWorkItems, int maxConcurrentTaskOrchestrationWorkItems, int expectedTargetWorkerCount) + [MemberData(nameof(Tests))] + public async Task TargetBasedScalingTest(TestData testData) { - this.orchestrationServiceMock.Setup(m => m.MaxConcurrentTaskActivityWorkItems).Returns(maxConcurrentTaskActivityWorkItems); - this.orchestrationServiceMock.Setup(m => m.MaxConcurrentTaskOrchestrationWorkItems).Returns(maxConcurrentTaskOrchestrationWorkItems); + this.orchestrationServiceMock.Setup(m => m.MaxConcurrentTaskActivityWorkItems).Returns(testData.MaxA); + this.orchestrationServiceMock.Setup(m => m.MaxConcurrentTaskOrchestrationWorkItems).Returns(testData.MaxO); - var loadInformation = new Dictionary() + if (testData.Current > testData.LoadInfos.Count) { - { 1, this.Create("A") }, - { 2, this.Create("B") }, - }; + throw new ArgumentException("invalid test parameter", nameof(testData.Current)); + } + var loadInformation = new Dictionary(); + for (int i = 0; i < testData.LoadInfos.Count; i++) + { + testData.LoadInfos[i].WorkerId = $"worker{Math.Min(i, testData.Current - 1)}"; + loadInformation.Add((uint)i, testData.LoadInfos[i]); + }; + var testMetrics = new Metrics() { LoadInformation = loadInformation, - Busy = "busy", + Busy = testData.Busy, Timestamp = DateTime.UtcNow, }; @@ -73,28 +79,119 @@ public async void TargetBasedScalingTest(int maxConcurrentTaskActivityWorkItems, TargetScalerResult result = await targetScaler.GetScaleResultAsync(new TargetScalerContext()); - Assert.Equal(expectedTargetWorkerCount, result.TargetWorkerCount); + Assert.Equal(testData.Expected, result.TargetWorkerCount); } - PartitionLoadInfo Create(string worker) + public record TestData( + int Expected, + List LoadInfos, + string Busy = "busy", + int MaxA = 10, + int MaxO = 10, + int Current = 1) { - return new PartitionLoadInfo() + } + + public static TheoryData Tests + { + get { - WorkerId = worker, - Activities = 11, - CacheMB = 1.1, - CachePct = 33, - CommitLogPosition = 64, - InputQueuePosition = 1231, - Instances = 3, - LatencyTrend = "IIIII", - MissRate = 0.1, - Outbox = 44, - Requests = 55, - Timers = 66, - Wakeup = DateTime.Parse("2022-10-08T17:00:44.7400082Z").ToUniversalTime(), - WorkItems = 77 - }; + var data = new TheoryData(); + + // if "Busy" is null, the task hub is idle and the expected target is zero + data.Add(new TestData(Expected: 0, [ Idle, FastForAWhile, NowSlow ], Busy: null)); + + // with backlog of activities, target is set to the number of activities divided by the max activities per worker (but capped by partition count) + data.Add(new TestData(Expected: 1, [Act_5000, Idle, Idle], MaxA: 5000)); + data.Add(new TestData(Expected: 1, [Act_4999, Idle, Idle], MaxA: 5000)); + data.Add(new TestData(Expected: 2, [Act_5001, Idle, Idle], MaxA: 5000)); + data.Add(new TestData(Expected: 10, [Act_5000, Idle, FastOnlyNow, Act_5000, Idle, Idle, Idle, Idle, Idle, Idle, Idle, Idle, Idle, Idle, Idle], MaxA: 1000)); + data.Add(new TestData(Expected: 6, [Act_5000, Idle, FastOnlyNow, Act_5000, Idle, Idle], MaxA: 1000)); + + // with load-challenged partitions, target is at least the number of challenged partitions + data.Add(new TestData(Expected: 4, [Act_5000, Act_5000, Act_5000, Act_5000, Idle, Idle], MaxA: 50000)); + data.Add(new TestData(Expected: 4, [Act_5000, Act_5000, NowSlow, NowVerySlow, FastOnlyNow, FastForAWhile], MaxA: 50000)); + data.Add(new TestData(Expected: 4, [Act_5000, Act_5000, NowSlow, NowVerySlow, WasSlow, Partial], MaxA: 50000)); + data.Add(new TestData(Expected: 4, [Act_5000, Act_5000, NowSlow, Orch_5000, FastOnlyNow, AlmostIdle], MaxA: 50000)); + data.Add(new TestData(Expected: 4, [Act_5000, Act_5000, NowSlow, NowVerySlow, Orch_5000, WasSlow], MaxA: 50000, MaxO: 50000)); + + // scale down: if current is above non-idle partitions, scale down to non-idle partitions + data.Add(new TestData(Expected: 5, [Act_5000, Act_5000, AlmostIdle, AlmostIdle, AlmostIdle, Idle], Current: 6, MaxA: 5000)); + data.Add(new TestData(Expected: 4, [Act_5000, Act_5000, Partial, Partial, Idle, Idle], Current: 6, MaxA: 5000)); + + // scale down below non-idle partitions: dont if some partitions are incomplete or show any slowness; otherwise by 1 + data.Add(new TestData(Expected: 4, [FastForAWhile, FastForAWhile, AlmostIdle, Partial], Current: 4)); + data.Add(new TestData(Expected: 4, [FastForAWhile, FastForAWhile, AlmostIdle, FastOnlyNow], Current: 4)); + data.Add(new TestData(Expected: 4, [FastForAWhile, FastForAWhile, AlmostIdle, WasSlow], Current: 4)); + data.Add(new TestData(Expected: 3, [FastForAWhile, FastForAWhile, AlmostIdle, AlmostIdle], Current: 4)); + data.Add(new TestData(Expected: 3, [FastForAWhile, FastForAWhile, AlmostIdle, FastForAWhile], Current: 4)); + + return data; + } } + + static PartitionLoadInfo Idle => new PartitionLoadInfo() + { + LatencyTrend = "IIIII", + }; + + static PartitionLoadInfo FastOnlyNow => new PartitionLoadInfo() + { + LatencyTrend = "MMHML", + }; + + static PartitionLoadInfo FastForAWhile => new PartitionLoadInfo() + { + LatencyTrend = "LLLLL", + }; + + static PartitionLoadInfo Partial => new PartitionLoadInfo() + { + LatencyTrend = "I", + }; + + static PartitionLoadInfo NowSlow => new PartitionLoadInfo() + { + LatencyTrend = "IIIIM", + }; + + static PartitionLoadInfo NowVerySlow => new PartitionLoadInfo() + { + LatencyTrend = "IIIIH", + }; + + static PartitionLoadInfo WasSlow => new PartitionLoadInfo() + { + LatencyTrend = "MIIII", + }; + + static PartitionLoadInfo AlmostIdle => new PartitionLoadInfo() + { + LatencyTrend = "LIIII", + }; + + static PartitionLoadInfo Act_5000 => new PartitionLoadInfo() + { + Activities = 5000, + LatencyTrend = "MMMMM", + }; + + static PartitionLoadInfo Act_4999 => new PartitionLoadInfo() + { + Activities = 4999, + LatencyTrend = "MMMMM", + }; + + static PartitionLoadInfo Act_5001 => new PartitionLoadInfo() + { + Activities = 5001, + LatencyTrend = "MMMMM", + }; + + static PartitionLoadInfo Orch_5000 => new PartitionLoadInfo() + { + WorkItems = 5000, + LatencyTrend = "IIIII", + }; } }