Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 35 additions & 25 deletions src/Microsoft.Azure.WebJobs.Host/Scale/ScaleManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
Expand All @@ -25,7 +26,7 @@ internal class ScaleManager : IScaleStatusProvider
private readonly IConfiguration _configuration;
private IOptions<ScaleOptions> _scaleOptions;
private IOptions<ConcurrencyOptions> _concurrencyOptions;
private static HashSet<string> _targetScalersInError = new HashSet<string>();
internal static ConcurrentDictionary<string, byte> _targetScalersInError = new ConcurrentDictionary<string, byte>();

public ScaleManager(
IScaleMonitorManager monitorManager,
Expand All @@ -42,7 +43,6 @@ public ScaleManager(
_metricsRepository = metricsRepository;
_concurrencyStatusRepository = concurrencyStatusRepository;
_logger = loggerFactory?.CreateLogger<ScaleManager>();
_targetScalersInError = new HashSet<string>();
_scaleOptions = scaleConfiguration;
_configuration = configuration;
_concurrencyOptions = concurrencyOptions;
Expand Down Expand Up @@ -167,27 +167,11 @@ private async Task<IDictionary<string, TargetScalerResult>> GetTargetScalersResu
_logger.LogDebug($"Snapshot dynamic concurrency for target scaler '{targetScaler.TargetScalerDescriptor.FunctionId}' is '{functionSnapshot.Concurrency}'");
}
}
TargetScalerResult result = null;
try
TargetScalerResult result = await TryGetScaleResultAsync(targetScaler, targetScaleStatusContext, _logger);
if (result == null)
{
result = await targetScaler.GetScaleResultAsync(targetScaleStatusContext);
}
catch (NotSupportedException ex)
{
string targetScalerUniqueId = GetTargetScalerFunctionUniqueId(targetScaler);

_logger.LogFunctionScaleError(
"Unable to use target based scaling, switching to metrics monitor.",
targetScaler.TargetScalerDescriptor.FunctionId,
ex);

lock (_targetScalersInError)
{
_targetScalersInError.Add(targetScalerUniqueId);
}

// Adding ScaleVote.None vote
result = new TargetScalerResult
// Target scaler does not support TBS — use ScaleVote.None
result = new TargetScalerResult
{
TargetWorkerCount = context.WorkerCount
};
Expand Down Expand Up @@ -231,7 +215,7 @@ internal static (List<IScaleMonitor>, List<ITargetScaler>) GetScalersToSample(
foreach (var scaler in targetScalers)
{
string scalerUniqueId = GetTargetScalerFunctionUniqueId(scaler);
if (!_targetScalersInError.Contains(scalerUniqueId))
if (!_targetScalersInError.ContainsKey(scalerUniqueId))
{
string assemblyName = GetAssemblyName(scaler.GetType());
bool featureDisabled = configuration.GetValue<string>(assemblyName) == "0";
Expand Down Expand Up @@ -290,11 +274,37 @@ internal static ScaleVote GetAggregateScaleVote(IEnumerable<ScaleVote> votes, Sc
return vote;
}

private static string GetTargetScalerFunctionUniqueId(ITargetScaler scaler)
/// <summary>
/// Attempts to get a scale result from a target scaler. If the scaler throws
/// NotSupportedException (e.g. missing Manage claim), it is added to
/// _targetScalersInError and null is returned, causing fallback to the
/// incremental scale monitor.
/// </summary>
internal static async Task<TargetScalerResult> TryGetScaleResultAsync(ITargetScaler targetScaler, TargetScalerContext context, ILogger logger, string caller = "GetScaleStatus")
{
return $"{GetAssemblyName(scaler.GetType())}-{scaler.TargetScalerDescriptor.FunctionId}";
try
{
return await targetScaler.GetScaleResultAsync(context).ConfigureAwait(false);
}
catch (NotSupportedException ex)
{
string scalerUniqueId = GetTargetScalerFunctionUniqueId(targetScaler);

logger?.LogFunctionScaleError(
$"Unable to use target based scaling, switching to metrics monitor. Detected by: {caller}.",
targetScaler.TargetScalerDescriptor.FunctionId,
ex);

_targetScalersInError.TryAdd(scalerUniqueId, 0);

return null;
}
}

internal static string GetTargetScalerFunctionUniqueId(ITargetScaler scaler)
{
return $"{GetAssemblyName(scaler.GetType())}-{scaler.TargetScalerDescriptor.FunctionId}";
}

private static string GetScaleMonitorFunctionUniqueId(IScaleMonitor monitor)
{
Expand Down
25 changes: 25 additions & 0 deletions src/Microsoft.Azure.WebJobs.Host/Scale/ScaleMonitorService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ internal class ScaleMonitorService : IHostedService, IDisposable
private readonly ITargetScalerManager _targetScalerManager;
private readonly IConfiguration _configuration;
private bool _disposed;
internal static DateTime _nextTargetScalerValidationTime = DateTime.MinValue;
private static readonly TimeSpan _targetScalerValidationInterval = TimeSpan.FromMinutes(10);

public ScaleMonitorService(
IScaleStatusProvider scaleStausProvider,
Expand Down Expand Up @@ -94,6 +96,29 @@ private async Task TakeMetricsSamplesAsync()
{
var (scaleMonitorsToProcess, targetScalersToSample) = ScaleManager.GetScalersToSample(_monitorManager, _targetScalerManager, _scaleOptions, _configuration);

// Periodically probe target scalers to discover any that throw NotSupportedException.
// This ensures the primary host independently detects target scaler failures,
// even when the Scale Controller runs on a different worker.
// Runs on first tick, then every 10 minutes to avoid per-tick overhead
// while still catching runtime permission changes (e.g. managed identity revocation).
if (targetScalersToSample.Any() && DateTime.UtcNow >= _nextTargetScalerValidationTime)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is all about taking metrics samples for incremental scaling. It doesn't make any sense to be making TBS calls here. Also, this approach makes a bunch of service calls whose results are just thrown away.

Instead, why don't we address the fundamental issue which is the _targetScalersInError collection as you're using it is supposed to be shared state across N workers, similar to metrics samples and snapshots. So one thing you could do is store that info in storage in the same way so its accessible by all instances.

@alrod alrod Apr 9, 2026

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @mathewc. A couple of points on the probe approach:

  1. Service call frequency — the probe wasn't per-tick. It ran on first tick, then cached for 10 minutes (_nextTargetScalerValidationTime). So it was one TBS call per scaler per 10 minutes, not "a bunch." For comparison, the concurrency snapshot (_concurrencyStatusRepository.ReadAsync) does a blob storage read on every SC call (~10s) with no caching.

  2. Storage-backed approach — I agree it's architecturally cleaner. I've implemented it in Fix target scaler fallback race condition via shared storage #3191 following the BlobStorageConcurrencyStatusRepository / NullConcurrencyStatusRepository pattern. It is a significantly bigger change (~700 lines vs ~150), but it fixes the root cause and keeps TBS calls out of the metrics sampling path.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do think I like the storage option better. Seems less hacky. One question I do have on this whole "fallback" approach - this ScaleMonitorService won't even be polling metrics if IsRuntimeScalingEnabled=false which is the normal case, right? Only if RTS is being used will IsRuntimeScalingEnabled=true. So usually TakeMetricsSamplesAsync won't even be running, so wondering how the fallback works in the normal case.

{
foreach (var targetScaler in targetScalersToSample)
{
try
{
await ScaleManager.TryGetScaleResultAsync(targetScaler, new TargetScalerContext(), _logger, "ScaleMonitorService").ConfigureAwait(false);
}
catch (Exception)
{
// Ignore transient errors — only NotSupportedException triggers fallback
// and that is already handled inside TryGetScaleResultAsync.
}
}

_nextTargetScalerValidationTime = DateTime.UtcNow + _targetScalerValidationInterval;
}

if (scaleMonitorsToProcess.Any())
{
_logger.LogDebug($"Taking metrics samples for {scaleMonitorsToProcess.Count()} monitor(s).");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ public async Task GetScalersToSample_FallsBackToMonitor_OnTargetScalerError()
Assert.Equal(result1.TargetWorkerCount, 1);
Assert.Equal(result1.Vote, ScaleVote.None);
var logs = _loggerProvider.GetAllLogMessages().Select(x => x.FormattedMessage).ToArray();
Assert.Single(logs, x => x == "Function 'function1' error: Unable to use target based scaling, switching to metrics monitor.");
Assert.Single(logs, x => x == "Function 'function1' error: Unable to use target based scaling, switching to metrics monitor. Detected by: GetScaleStatus.");
_loggerProvider.ClearAllLogMessages();

var (monitors2, scalers2) = ScaleManager.GetScalersToSample(
Expand All @@ -400,7 +400,7 @@ public async Task GetScalersToSample_FallsBackToMonitor_OnTargetScalerError()
Assert.Equal(result2.TargetWorkerCount, null);
Assert.Equal(result2.Vote, ScaleVote.ScaleIn);
logs = _loggerProvider.GetAllLogMessages().Select(x => x.FormattedMessage).ToArray();
Assert.DoesNotContain(logs, x => x == "Function 'function1' error: Unable to use target based scaling, switching to metrics monitor.");
Assert.DoesNotContain(logs, x => x == "Function 'function1' error: Unable to use target based scaling, switching to metrics monitor. Detected by: GetScaleStatus.");
}

[Fact]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,88 @@ await TestHelpers.Await(() =>
var metricsWritten = _metricsRepository.Metrics[monitor2].Take(5);
Assert.Equal(testMetrics2, metricsWritten);
}
[Fact]
public async Task OnTimer_ProbesTargetScalers_FallsBackToMonitor()
{
// Arrange: create a fresh service with TBS enabled
var monitors = new List<IScaleMonitor>
{
new TestScaleMonitor<ScaleMetrics>("function1-test-test", "function1")
};
var scalers = new List<ITargetScaler>
{
new FaultyTargetScaler
{
TargetScalerDescriptor = new TargetScalerDescriptor("function1")
}
};

var monitorManagerMock = new Mock<IScaleMonitorManager>(MockBehavior.Strict);
monitorManagerMock.Setup(p => p.GetMonitors()).Returns(() => monitors);
var targetScalerManagerMock = new Mock<ITargetScalerManager>(MockBehavior.Strict);
targetScalerManagerMock.Setup(p => p.GetTargetScalers()).Returns(() => scalers);

var loggerProvider = new TestLoggerProvider();
var loggerFactory = new LoggerFactory();
loggerFactory.AddProvider(loggerProvider);

var configuration = new ConfigurationBuilder()
.AddInMemoryCollection(new Dictionary<string, string> { { "Microsoft.Azure.WebJobs.Host.UnitTests", "1" } }).Build();

var options = Options.Create(new ScaleOptions
{
ScaleMetricsSampleInterval = TimeSpan.FromSeconds(1),
IsRuntimeScalingEnabled = true,
IsTargetScalingEnabled = true
});

var primaryHostState = new PrimaryHostStateProvider { IsPrimary = true };

// Clear static state from prior tests
ScaleManager._targetScalersInError.Clear();
ScaleMonitorService._nextTargetScalerValidationTime = DateTime.MinValue;

var service = new ScaleMonitorService(
new Mock<ScaleManager>().Object,
new TestMetricsRepository(),
options,
primaryHostState,
monitorManagerMock.Object,
targetScalerManagerMock.Object,
configuration,
loggerFactory);

// Before starting: target scaler is not in error, so GetScalersToSample returns it as target scaler
var (monitors1, scalers1) = ScaleManager.GetScalersToSample(
monitorManagerMock.Object, targetScalerManagerMock.Object, options, configuration);
Assert.Empty(monitors1);
Assert.Single(scalers1);

// Act: start the service — the timer tick will probe the faulty target scaler
await service.StartAsync(CancellationToken.None);

await TestHelpers.Await(() =>
{
var logs = loggerProvider.GetAllLogMessages();
return logs.Any(l => l.FormattedMessage.Contains("Unable to use target based scaling"));
});

await service.StopAsync(CancellationToken.None);

// Assert: after probing, the faulty scaler is in _targetScalersInError
// so GetScalersToSample now returns the incremental monitor instead
var (monitors2, scalers2) = ScaleManager.GetScalersToSample(
monitorManagerMock.Object, targetScalerManagerMock.Object, options, configuration);
Assert.Single(monitors2);
Assert.Empty(scalers2);

// Verify the log message includes the ScaleMonitorService caller
var errorLogs = loggerProvider.GetAllLogMessages()
.Where(l => l.FormattedMessage.Contains("Unable to use target based scaling"))
.ToArray();
Assert.Single(errorLogs);
Assert.Contains("Detected by: ScaleMonitorService", errorLogs[0].FormattedMessage);
}
}

public class TestMetricsRepository : IScaleMetricsRepository
Expand Down