Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ public async Task<TargetScalerResult> GetScaleResultAsync(TargetScalerContext co

if (metrics.TaskHubIsIdle)
{
target = 0; // we need no workers
this.scaleResult.TargetWorkerCount = 0; // we need no workers
return this.scaleResult;
}

target = 1; // always need at least one worker when we are not idle
Expand All @@ -52,7 +53,7 @@ public async Task<TargetScalerResult> GetScaleResultAsync(TargetScalerContext co
int activities = metrics.LoadInformation.Where(info => info.Value.IsLoaded()).Sum(info => info.Value.Activities);
if (activities > 0)
{
int requestedWorkers = (activities / maxConcurrentActivities) + 1;
int requestedWorkers = (activities + (maxConcurrentActivities - 1)) / maxConcurrentActivities; // rounded-up integer division
requestedWorkers = Math.Min(requestedWorkers, metrics.LoadInformation.Count); // cannot use more workers than partitions
target = Math.Max(target, requestedWorkers);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<TargetFramework>net6.0</TargetFramework>
<IsPackable>false</IsPackable>
<SignAssembly>true</SignAssembly>
<LangVersion>8.0</LangVersion>
<LangVersion>12.0</LangVersion>
<AssemblyOriginatorKeyFile>..\..\sign.snk</AssemblyOriginatorKeyFile>
</PropertyGroup>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ namespace DurableTask.Netherite.AzureFunctions.Tests
{
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using DurableTask.Core;
using DurableTask.Netherite.Scaling;
using DurableTask.Netherite.Tests;
Expand Down Expand Up @@ -37,23 +38,28 @@ public TargetBasedScalingTests(ITestOutputHelper output) : base(output)
}

[Theory]
[InlineData(10, 10, 2)]
[InlineData(20, 20, 2)]
public async void TargetBasedScalingTest(int maxConcurrentTaskActivityWorkItems, int maxConcurrentTaskOrchestrationWorkItems, int expectedTargetWorkerCount)
[MemberData(nameof(Tests))]
public async Task TargetBasedScalingTest(TestData testData)
{
this.orchestrationServiceMock.Setup(m => m.MaxConcurrentTaskActivityWorkItems).Returns(maxConcurrentTaskActivityWorkItems);
this.orchestrationServiceMock.Setup(m => m.MaxConcurrentTaskOrchestrationWorkItems).Returns(maxConcurrentTaskOrchestrationWorkItems);
this.orchestrationServiceMock.Setup(m => m.MaxConcurrentTaskActivityWorkItems).Returns(testData.MaxA);
this.orchestrationServiceMock.Setup(m => m.MaxConcurrentTaskOrchestrationWorkItems).Returns(testData.MaxO);

var loadInformation = new Dictionary<uint, PartitionLoadInfo>()
if (testData.Current > testData.LoadInfos.Count)
{
{ 1, this.Create("A") },
{ 2, this.Create("B") },
};
throw new ArgumentException("invalid test parameter", nameof(testData.Current));
}

var loadInformation = new Dictionary<uint, PartitionLoadInfo>();
for (int i = 0; i < testData.LoadInfos.Count; i++)
{
testData.LoadInfos[i].WorkerId = $"worker{Math.Min(i, testData.Current - 1)}";
loadInformation.Add((uint)i, testData.LoadInfos[i]);
};

var testMetrics = new Metrics()
{
LoadInformation = loadInformation,
Busy = "busy",
Busy = testData.Busy,
Timestamp = DateTime.UtcNow,
};

Expand All @@ -73,28 +79,119 @@ public async void TargetBasedScalingTest(int maxConcurrentTaskActivityWorkItems,

TargetScalerResult result = await targetScaler.GetScaleResultAsync(new TargetScalerContext());

Assert.Equal(expectedTargetWorkerCount, result.TargetWorkerCount);
Assert.Equal(testData.Expected, result.TargetWorkerCount);
}

PartitionLoadInfo Create(string worker)
public record TestData(
int Expected,
List<PartitionLoadInfo> LoadInfos,
string Busy = "busy",
int MaxA = 10,
int MaxO = 10,
int Current = 1)
{
return new PartitionLoadInfo()
}

public static TheoryData<TestData> Tests
{
get
{
WorkerId = worker,
Activities = 11,
CacheMB = 1.1,
CachePct = 33,
CommitLogPosition = 64,
InputQueuePosition = 1231,
Instances = 3,
LatencyTrend = "IIIII",
MissRate = 0.1,
Outbox = 44,
Requests = 55,
Timers = 66,
Wakeup = DateTime.Parse("2022-10-08T17:00:44.7400082Z").ToUniversalTime(),
WorkItems = 77
};
var data = new TheoryData<TestData>();

// if "Busy" is null, the task hub is idle and the expected target is zero
data.Add(new TestData(Expected: 0, [ Idle, FastForAWhile, NowSlow ], Busy: null));

// with backlog of activities, target is set to the number of activities divided by the max activities per worker (but capped by partition count)
data.Add(new TestData(Expected: 1, [Act_5000, Idle, Idle], MaxA: 5000));
data.Add(new TestData(Expected: 1, [Act_4999, Idle, Idle], MaxA: 5000));
data.Add(new TestData(Expected: 2, [Act_5001, Idle, Idle], MaxA: 5000));
data.Add(new TestData(Expected: 10, [Act_5000, Idle, FastOnlyNow, Act_5000, Idle, Idle, Idle, Idle, Idle, Idle, Idle, Idle, Idle, Idle, Idle], MaxA: 1000));
data.Add(new TestData(Expected: 6, [Act_5000, Idle, FastOnlyNow, Act_5000, Idle, Idle], MaxA: 1000));

// with load-challenged partitions, target is at least the number of challenged partitions
data.Add(new TestData(Expected: 4, [Act_5000, Act_5000, Act_5000, Act_5000, Idle, Idle], MaxA: 50000));
data.Add(new TestData(Expected: 4, [Act_5000, Act_5000, NowSlow, NowVerySlow, FastOnlyNow, FastForAWhile], MaxA: 50000));
data.Add(new TestData(Expected: 4, [Act_5000, Act_5000, NowSlow, NowVerySlow, WasSlow, Partial], MaxA: 50000));
data.Add(new TestData(Expected: 4, [Act_5000, Act_5000, NowSlow, Orch_5000, FastOnlyNow, AlmostIdle], MaxA: 50000));
data.Add(new TestData(Expected: 4, [Act_5000, Act_5000, NowSlow, NowVerySlow, Orch_5000, WasSlow], MaxA: 50000, MaxO: 50000));

// scale down: if current is above non-idle partitions, scale down to non-idle partitions
data.Add(new TestData(Expected: 5, [Act_5000, Act_5000, AlmostIdle, AlmostIdle, AlmostIdle, Idle], Current: 6, MaxA: 5000));
data.Add(new TestData(Expected: 4, [Act_5000, Act_5000, Partial, Partial, Idle, Idle], Current: 6, MaxA: 5000));

// scale down below non-idle partitions: dont if some partitions are incomplete or show any slowness; otherwise by 1
data.Add(new TestData(Expected: 4, [FastForAWhile, FastForAWhile, AlmostIdle, Partial], Current: 4));
data.Add(new TestData(Expected: 4, [FastForAWhile, FastForAWhile, AlmostIdle, FastOnlyNow], Current: 4));
data.Add(new TestData(Expected: 4, [FastForAWhile, FastForAWhile, AlmostIdle, WasSlow], Current: 4));
data.Add(new TestData(Expected: 3, [FastForAWhile, FastForAWhile, AlmostIdle, AlmostIdle], Current: 4));
data.Add(new TestData(Expected: 3, [FastForAWhile, FastForAWhile, AlmostIdle, FastForAWhile], Current: 4));

return data;
}
}

static PartitionLoadInfo Idle => new PartitionLoadInfo()
{
LatencyTrend = "IIIII",
};

static PartitionLoadInfo FastOnlyNow => new PartitionLoadInfo()
{
LatencyTrend = "MMHML",
};

static PartitionLoadInfo FastForAWhile => new PartitionLoadInfo()
{
LatencyTrend = "LLLLL",
};

static PartitionLoadInfo Partial => new PartitionLoadInfo()
{
LatencyTrend = "I",
};

static PartitionLoadInfo NowSlow => new PartitionLoadInfo()
{
LatencyTrend = "IIIIM",
};

static PartitionLoadInfo NowVerySlow => new PartitionLoadInfo()
{
LatencyTrend = "IIIIH",
};

static PartitionLoadInfo WasSlow => new PartitionLoadInfo()
{
LatencyTrend = "MIIII",
};

static PartitionLoadInfo AlmostIdle => new PartitionLoadInfo()
{
LatencyTrend = "LIIII",
};

static PartitionLoadInfo Act_5000 => new PartitionLoadInfo()
{
Activities = 5000,
LatencyTrend = "MMMMM",
};

static PartitionLoadInfo Act_4999 => new PartitionLoadInfo()
{
Activities = 4999,
LatencyTrend = "MMMMM",
};

static PartitionLoadInfo Act_5001 => new PartitionLoadInfo()
{
Activities = 5001,
LatencyTrend = "MMMMM",
};

static PartitionLoadInfo Orch_5000 => new PartitionLoadInfo()
{
WorkItems = 5000,
LatencyTrend = "IIIII",
};
}
}
Loading