diff --git a/src/Microsoft.Azure.WebJobs.Host.Storage/BlobStorageTargetScalerErrorRepository.cs b/src/Microsoft.Azure.WebJobs.Host.Storage/BlobStorageTargetScalerErrorRepository.cs new file mode 100644 index 000000000..40d8e08aa --- /dev/null +++ b/src/Microsoft.Azure.WebJobs.Host.Storage/BlobStorageTargetScalerErrorRepository.cs @@ -0,0 +1,181 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +#nullable enable + +using System; +using System.Collections.Generic; +using System.IO; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Azure; +using Azure.Storage.Blobs; +using Azure.Storage.Blobs.Models; +using Microsoft.Azure.WebJobs.Host.Executors; +using Microsoft.Azure.WebJobs.Host.Scale; +using Microsoft.Azure.WebJobs.Host.Storage; +using Microsoft.Azure.WebJobs.Logging; +using Microsoft.Extensions.Logging; +using Newtonsoft.Json; + +namespace Microsoft.Azure.WebJobs.Host +{ + internal class BlobStorageTargetScalerErrorRepository : ITargetScalerErrorRepository + { + private readonly IHostIdProvider _hostIdProvider; + private readonly ILogger _logger; + private readonly IAzureBlobStorageProvider _blobStorageProvider; + private BlobContainerClient? _blobContainerClient; + + private const int MaxRetries = 3; + internal static readonly TimeSpan DefaultTtl = TimeSpan.FromMinutes(10); + + public BlobStorageTargetScalerErrorRepository(IHostIdProvider hostIdProvider, ILoggerFactory loggerFactory, IAzureBlobStorageProvider azureStorageProvider) + { + _hostIdProvider = hostIdProvider; + _logger = loggerFactory.CreateLogger(LogCategories.Scale); + _blobStorageProvider = azureStorageProvider; + } + + public async Task AddAsync(string scalerUniqueId, CancellationToken cancellationToken) + { + try + { + for (int attempt = 0; attempt < MaxRetries; attempt++) + { + // Read current state with ETag + var (state, etag) = await ReadBlobWithETagAsync(cancellationToken); + var set = state?.Scalers ?? new HashSet(); + set.Add(scalerUniqueId); + + var newState = new TargetScalerErrorState + { + Scalers = set, + LastUpdated = DateTime.UtcNow + }; + + try + { + await WriteBlobAsync(newState, etag, cancellationToken); + return; + } + catch (RequestFailedException ex) when (ex.Status == 412 || ex.Status == 409) + { + // ETag mismatch — another instance wrote concurrently, retry + } + } + + _logger.LogWarning("Failed to persist target scaler error state after {MaxRetries} attempts due to concurrent updates.", MaxRetries); + } + catch (Exception e) + { + _logger.LogError(e, "Error persisting target scaler error state."); + } + } + + public async Task> GetAsync(CancellationToken cancellationToken) + { + try + { + var (state, _) = await ReadBlobWithETagAsync(cancellationToken); + if (state?.LastUpdated != null && (DateTime.UtcNow - state.LastUpdated.Value) > DefaultTtl) + { + // Data is stale — treat as empty so target scalers are re-evaluated + return new HashSet(); + } + return state?.Scalers ?? new HashSet(); + } + catch (Exception e) + { + _logger.LogError(e, "Error reading target scaler error state."); + return new HashSet(); + } + } + + private async Task<(TargetScalerErrorState?, ETag)> ReadBlobWithETagAsync(CancellationToken cancellationToken) + { + string blobPath = await GetBlobPathAsync(cancellationToken); + + try + { + BlobContainerClient? containerClient = await GetContainerClientAsync(cancellationToken); + if (containerClient != null) + { + BlobClient blobClient = containerClient.GetBlobClient(blobPath); + var response = await blobClient.DownloadAsync(cancellationToken: cancellationToken); + + string content; + using (StreamReader reader = new StreamReader(response.Value.Content, true)) + { + content = reader.ReadToEnd(); + } + + if (!string.IsNullOrEmpty(content)) + { + var state = JsonConvert.DeserializeObject(content); + return (state, response.Value.Details.ETag); + } + } + } + catch (RequestFailedException exception) when (exception.Status == 404) + { + // blob doesn't exist yet — no errors recorded + return (null, default); + } + + return (null, default); + } + + private async Task WriteBlobAsync(TargetScalerErrorState state, ETag etag, CancellationToken cancellationToken) + { + string blobPath = await GetBlobPathAsync(cancellationToken); + BlobContainerClient? containerClient = await GetContainerClientAsync(cancellationToken); + if (containerClient != null) + { + BlobClient blobClient = containerClient.GetBlobClient(blobPath); + var content = JsonConvert.SerializeObject(state); + using (Stream stream = new MemoryStream(Encoding.UTF8.GetBytes(content))) + { + var options = new BlobUploadOptions(); + if (etag != default) + { + // Existing blob — only write if it hasn't changed since we read it + options.Conditions = new BlobRequestConditions { IfMatch = etag }; + } + else + { + // No blob exists yet — only create if it still doesn't exist + options.Conditions = new BlobRequestConditions { IfNoneMatch = ETag.All }; + } + await blobClient.UploadAsync(stream, options, cancellationToken); + } + } + } + + internal async Task GetContainerClientAsync(CancellationToken cancellationToken) + { + if (_blobContainerClient == null && _blobStorageProvider.TryCreateHostingBlobContainerClient(out _blobContainerClient)) + { + await _blobContainerClient.CreateIfNotExistsAsync(cancellationToken: cancellationToken); + } + + return _blobContainerClient; + } + + internal async Task GetBlobPathAsync(CancellationToken cancellationToken) + { + string hostId = await _hostIdProvider.GetHostIdAsync(cancellationToken); + return $"scale/{hostId}/targetScalersInError.json"; + } + + internal class TargetScalerErrorState + { + [JsonProperty("scalers")] + public HashSet Scalers { get; set; } = new HashSet(); + + [JsonProperty("lastUpdated")] + public DateTime? LastUpdated { get; set; } + } + } +} diff --git a/src/Microsoft.Azure.WebJobs.Host.Storage/StorageServiceCollectionExtensions.cs b/src/Microsoft.Azure.WebJobs.Host.Storage/StorageServiceCollectionExtensions.cs index 4e8737aa3..21e8cb6f8 100644 --- a/src/Microsoft.Azure.WebJobs.Host.Storage/StorageServiceCollectionExtensions.cs +++ b/src/Microsoft.Azure.WebJobs.Host.Storage/StorageServiceCollectionExtensions.cs @@ -34,6 +34,7 @@ public static void AddAzureStorageCoreServices(this IServiceCollection services) services.TryAddSingleton(); services.AddSingleton(); + services.AddSingleton(); } public static void AddAzureStorageScaleServices(this IServiceCollection services) @@ -41,6 +42,7 @@ public static void AddAzureStorageScaleServices(this IServiceCollection services services.TryAddEnumerable(ServiceDescriptor.Transient, CoreWebJobsOptionsSetup>()); services.TryAddSingleton(); services.AddSingleton(); + services.AddSingleton(); } // This is only called if the host didn't already provide an implementation diff --git a/src/Microsoft.Azure.WebJobs.Host/Hosting/WebJobsServiceCollectionExtensions.cs b/src/Microsoft.Azure.WebJobs.Host/Hosting/WebJobsServiceCollectionExtensions.cs index 1b30cb45f..64298f7d0 100644 --- a/src/Microsoft.Azure.WebJobs.Host/Hosting/WebJobsServiceCollectionExtensions.cs +++ b/src/Microsoft.Azure.WebJobs.Host/Hosting/WebJobsServiceCollectionExtensions.cs @@ -122,6 +122,9 @@ public static IWebJobsBuilder AddWebJobs(this IServiceCollection services, Actio services.AddOptionsLogging(); // Concurrency management services.TryAddSingleton(); + + // Target scaler error state + services.TryAddSingleton(); services.TryAddSingleton(); services.TryAddSingleton(); services.TryAddEnumerable(ServiceDescriptor.Singleton()); diff --git a/src/Microsoft.Azure.WebJobs.Host/Scale/ITargetScalerErrorRepository.cs b/src/Microsoft.Azure.WebJobs.Host/Scale/ITargetScalerErrorRepository.cs new file mode 100644 index 000000000..aeda12d34 --- /dev/null +++ b/src/Microsoft.Azure.WebJobs.Host/Scale/ITargetScalerErrorRepository.cs @@ -0,0 +1,32 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; + +namespace Microsoft.Azure.WebJobs.Host.Scale +{ + /// + /// Provides functionality for persisting target scaler errors across multiple host instances. + /// When a target scaler throws , the scaler identifier + /// is recorded so all instances can fall back to incremental scale monitoring. + /// + internal interface ITargetScalerErrorRepository + { + /// + /// Adds a target scaler identifier to the set of scalers in error. + /// + /// The unique identifier of the target scaler. + /// A cancellation token. + /// A task that completes when the write is finished. + Task AddAsync(string scalerUniqueId, CancellationToken cancellationToken); + + /// + /// Returns the set of target scaler identifiers currently in error. + /// + /// A cancellation token. + /// A task that returns the set of scaler identifiers in error. + Task> GetAsync(CancellationToken cancellationToken); + } +} diff --git a/src/Microsoft.Azure.WebJobs.Host/Scale/NullTargetScalerErrorRepository.cs b/src/Microsoft.Azure.WebJobs.Host/Scale/NullTargetScalerErrorRepository.cs new file mode 100644 index 000000000..84debe852 --- /dev/null +++ b/src/Microsoft.Azure.WebJobs.Host/Scale/NullTargetScalerErrorRepository.cs @@ -0,0 +1,23 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; + +namespace Microsoft.Azure.WebJobs.Host.Scale +{ + internal class NullTargetScalerErrorRepository : ITargetScalerErrorRepository + { + public Task AddAsync(string scalerUniqueId, CancellationToken cancellationToken) + { + return Task.CompletedTask; + } + + public Task> GetAsync(CancellationToken cancellationToken) + { + ISet result = new HashSet(); + return Task.FromResult(result); + } + } +} diff --git a/src/Microsoft.Azure.WebJobs.Host/Scale/ScaleManager.cs b/src/Microsoft.Azure.WebJobs.Host/Scale/ScaleManager.cs index bef51f687..ab0bfca1e 100644 --- a/src/Microsoft.Azure.WebJobs.Host/Scale/ScaleManager.cs +++ b/src/Microsoft.Azure.WebJobs.Host/Scale/ScaleManager.cs @@ -21,17 +21,18 @@ internal class ScaleManager : IScaleStatusProvider private readonly ITargetScalerManager _targetScalerManager; private readonly IScaleMetricsRepository _metricsRepository; private readonly IConcurrencyStatusRepository _concurrencyStatusRepository; + private readonly ITargetScalerErrorRepository _targetScalerErrorRepository; private readonly ILogger _logger; private readonly IConfiguration _configuration; private IOptions _scaleOptions; private IOptions _concurrencyOptions; - private static HashSet _targetScalersInError = new HashSet(); public ScaleManager( IScaleMonitorManager monitorManager, ITargetScalerManager targetScalerManager, IScaleMetricsRepository metricsRepository, IConcurrencyStatusRepository concurrencyStatusRepository, + ITargetScalerErrorRepository targetScalerErrorRepository, IOptions scaleConfiguration, ILoggerFactory loggerFactory, IConfiguration configuration, @@ -41,8 +42,8 @@ public ScaleManager( _targetScalerManager = targetScalerManager; _metricsRepository = metricsRepository; _concurrencyStatusRepository = concurrencyStatusRepository; + _targetScalerErrorRepository = targetScalerErrorRepository; _logger = loggerFactory?.CreateLogger(); - _targetScalersInError = new HashSet(); _scaleOptions = scaleConfiguration; _configuration = configuration; _concurrencyOptions = concurrencyOptions; @@ -60,7 +61,7 @@ internal ScaleManager() /// A task that returns the . public async Task GetScaleStatusAsync(ScaleStatusContext context) { - var (scaleMonitorsToProcess, targetScalersToProcess) = GetScalersToSample(_monitorManager, _targetScalerManager, _scaleOptions, _configuration); + var (scaleMonitorsToProcess, targetScalersToProcess) = await GetScalersToSample(_monitorManager, _targetScalerManager, _scaleOptions, _configuration, _targetScalerErrorRepository); var scaleStatuses = await GetScaleMonitorsResultAsync(context, scaleMonitorsToProcess); var targetScalerResults = await GetTargetScalersResultAsync(context, targetScalersToProcess); @@ -181,10 +182,7 @@ private async Task> GetTargetScalersResu targetScaler.TargetScalerDescriptor.FunctionId, ex); - lock (_targetScalersInError) - { - _targetScalersInError.Add(targetScalerUniqueId); - } + await _targetScalerErrorRepository.AddAsync(targetScalerUniqueId, CancellationToken.None); // Adding ScaleVote.None vote result = new TargetScalerResult @@ -212,11 +210,12 @@ private async Task> GetTargetScalersResu /// Returns scale monitors and target scalers we want to use based on the configuration. /// Scaler monitor will be ignored if a target scaler is defined in the same extensions assembly and TBS is enabled. /// - internal static (List, List) GetScalersToSample( + internal static async Task<(List, List)> GetScalersToSample( IScaleMonitorManager monitorManager, ITargetScalerManager targetScalerManager, IOptions scaleOptions, - IConfiguration configuration) + IConfiguration configuration, + ITargetScalerErrorRepository targetScalerErrorRepository) { var scaleMonitors = monitorManager.GetMonitors(); var targetScalers = targetScalerManager.GetTargetScalers(); @@ -228,10 +227,11 @@ internal static (List, List) GetScalersToSample( if (scaleOptions.Value.IsTargetScalingEnabled) { HashSet targetScalerFunctions = new HashSet(); + var errored = await targetScalerErrorRepository.GetAsync(CancellationToken.None); foreach (var scaler in targetScalers) { string scalerUniqueId = GetTargetScalerFunctionUniqueId(scaler); - if (!_targetScalersInError.Contains(scalerUniqueId)) + if (!errored.Contains(scalerUniqueId)) { string assemblyName = GetAssemblyName(scaler.GetType()); bool featureDisabled = configuration.GetValue(assemblyName) == "0"; diff --git a/src/Microsoft.Azure.WebJobs.Host/Scale/ScaleMonitorService.cs b/src/Microsoft.Azure.WebJobs.Host/Scale/ScaleMonitorService.cs index 92cc8d578..ebdc76201 100644 --- a/src/Microsoft.Azure.WebJobs.Host/Scale/ScaleMonitorService.cs +++ b/src/Microsoft.Azure.WebJobs.Host/Scale/ScaleMonitorService.cs @@ -29,6 +29,7 @@ internal class ScaleMonitorService : IHostedService, IDisposable private readonly IScaleMonitorManager _monitorManager; private readonly ITargetScalerManager _targetScalerManager; private readonly IConfiguration _configuration; + private readonly ITargetScalerErrorRepository _targetScalerErrorRepository; private bool _disposed; public ScaleMonitorService( @@ -39,7 +40,8 @@ public ScaleMonitorService( IScaleMonitorManager monitorManager, ITargetScalerManager targetScalerManager, IConfiguration configuration, - ILoggerFactory loggerFactory) + ILoggerFactory loggerFactory, + ITargetScalerErrorRepository targetScalerErrorRepository = null) { _scaleStausProvider = scaleStausProvider; _metricsRepository = metricsRepository; @@ -50,6 +52,7 @@ public ScaleMonitorService( _monitorManager = monitorManager; _targetScalerManager = targetScalerManager; _configuration = configuration; + _targetScalerErrorRepository = targetScalerErrorRepository ?? new NullTargetScalerErrorRepository(); } public Task StartAsync(CancellationToken cancellationToken) @@ -92,7 +95,7 @@ private async Task TakeMetricsSamplesAsync() { try { - var (scaleMonitorsToProcess, targetScalersToSample) = ScaleManager.GetScalersToSample(_monitorManager, _targetScalerManager, _scaleOptions, _configuration); + var (scaleMonitorsToProcess, targetScalersToSample) = await ScaleManager.GetScalersToSample(_monitorManager, _targetScalerManager, _scaleOptions, _configuration, _targetScalerErrorRepository); if (scaleMonitorsToProcess.Any()) { diff --git a/test/Microsoft.Azure.WebJobs.Host.EndToEndTests/BlobStorageTargetScalerErrorRepositoryTests.cs b/test/Microsoft.Azure.WebJobs.Host.EndToEndTests/BlobStorageTargetScalerErrorRepositoryTests.cs new file mode 100644 index 000000000..9e28e0a08 --- /dev/null +++ b/test/Microsoft.Azure.WebJobs.Host.EndToEndTests/BlobStorageTargetScalerErrorRepositoryTests.cs @@ -0,0 +1,186 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using Azure.Storage.Blobs; +using Microsoft.Azure.WebJobs.Host.Executors; +using Microsoft.Azure.WebJobs.Host.Scale; +using Microsoft.Azure.WebJobs.Host.Storage; +using Microsoft.Azure.WebJobs.Host.TestCommon; +using Microsoft.Extensions.Logging; +using Moq; +using Newtonsoft.Json; +using Xunit; +using static Microsoft.Azure.WebJobs.Host.BlobStorageTargetScalerErrorRepository; + +namespace Microsoft.Azure.WebJobs.Host.EndToEndTests +{ + [Trait(TestTraits.CategoryTraitName, TestTraits.ScaleMonitoring)] + public class BlobStorageTargetScalerErrorRepositoryTests + { + private const string TestHostId = "test123"; + private readonly BlobStorageTargetScalerErrorRepository _repository; + private readonly LoggerFactory _loggerFactory; + private readonly TestLoggerProvider _loggerProvider; + private readonly Mock _mockHostIdProvider; + + public BlobStorageTargetScalerErrorRepositoryTests() + { + _loggerFactory = new LoggerFactory(); + _loggerProvider = new TestLoggerProvider(); + _loggerFactory.AddProvider(_loggerProvider); + + _mockHostIdProvider = new Mock(MockBehavior.Strict); + _mockHostIdProvider.Setup(p => p.GetHostIdAsync(CancellationToken.None)).ReturnsAsync(TestHostId); + + _repository = new BlobStorageTargetScalerErrorRepository(_mockHostIdProvider.Object, _loggerFactory, TestHelpers.GetTestAzureBlobStorageProvider()); + } + + [Fact] + public async Task GetBlobPathAsync_ReturnsExpectedPath() + { + string path = await _repository.GetBlobPathAsync(CancellationToken.None); + + Assert.Equal($"scale/{TestHostId}/targetScalersInError.json", path); + } + + [Fact] + public async Task AddAsync_WritesExpectedBlob() + { + await DeleteTestBlobsAsync(); + + // Verify blob doesn't exist + var path = await _repository.GetBlobPathAsync(CancellationToken.None); + BlobContainerClient blobContainerClient = await _repository.GetContainerClientAsync(CancellationToken.None); + BlobClient blobClient = blobContainerClient.GetBlobClient(path); + bool exists = await blobClient.ExistsAsync(); + Assert.False(exists); + + // Add a scaler error + await _repository.AddAsync("scaler-a", CancellationToken.None); + + // Verify blob was created with correct content + exists = await blobClient.ExistsAsync(); + Assert.True(exists); + + string content = await blobClient.DownloadTextAsync(); + var state = JsonConvert.DeserializeObject(content); + Assert.Single(state.Scalers); + Assert.Contains("scaler-a", state.Scalers); + Assert.NotNull(state.LastUpdated); + + // Add another and verify both are present + await _repository.AddAsync("scaler-b", CancellationToken.None); + content = await blobClient.DownloadTextAsync(); + state = JsonConvert.DeserializeObject(content); + Assert.Equal(2, state.Scalers.Count); + Assert.Contains("scaler-a", state.Scalers); + Assert.Contains("scaler-b", state.Scalers); + } + + [Fact] + public async Task GetAsync_ReadsExpectedBlob() + { + await DeleteTestBlobsAsync(); + + // Write a blob directly using the new state format + string path = await _repository.GetBlobPathAsync(CancellationToken.None); + BlobContainerClient blobContainerClient = await _repository.GetContainerClientAsync(CancellationToken.None); + BlobClient blobClient = blobContainerClient.GetBlobClient(path); + + var testState = new TargetScalerErrorState + { + Scalers = new HashSet { "scaler-x", "scaler-y" }, + LastUpdated = DateTime.UtcNow + }; + string content = JsonConvert.SerializeObject(testState); + await blobClient.UploadTextAsync(content, overwrite: true); + + // Read via repository + var result = await _repository.GetAsync(CancellationToken.None); + + Assert.Equal(2, result.Count); + Assert.Contains("scaler-x", result); + Assert.Contains("scaler-y", result); + } + + [Fact] + public async Task GetAsync_NoBlob_ReturnsEmpty() + { + await DeleteTestBlobsAsync(); + + var result = await _repository.GetAsync(CancellationToken.None); + + Assert.Empty(result); + } + + [Fact] + public async Task GetAsync_StaleData_ReturnsEmpty() + { + await DeleteTestBlobsAsync(); + + // Write a blob with an old timestamp (beyond the TTL) + string path = await _repository.GetBlobPathAsync(CancellationToken.None); + BlobContainerClient blobContainerClient = await _repository.GetContainerClientAsync(CancellationToken.None); + BlobClient blobClient = blobContainerClient.GetBlobClient(path); + + var staleState = new TargetScalerErrorState + { + Scalers = new HashSet { "scaler-stale" }, + LastUpdated = DateTime.UtcNow - BlobStorageTargetScalerErrorRepository.DefaultTtl - TimeSpan.FromMinutes(1) + }; + string content = JsonConvert.SerializeObject(staleState); + await blobClient.UploadTextAsync(content, overwrite: true); + + // GetAsync should return empty because data is beyond TTL + var result = await _repository.GetAsync(CancellationToken.None); + Assert.Empty(result); + } + + [Fact] + public async Task GetAsync_FreshData_ReturnsScalers() + { + await DeleteTestBlobsAsync(); + + // Write via AddAsync (writes current timestamp) + await _repository.AddAsync("scaler-fresh", CancellationToken.None); + + // GetAsync should return the scaler because data is fresh + var result = await _repository.GetAsync(CancellationToken.None); + Assert.Single(result); + Assert.Contains("scaler-fresh", result); + } + + [Fact] + public async Task NoStorageConnection_HandledGracefully() + { + var mockBlobStorageProvider = new Mock(MockBehavior.Strict); + BlobContainerClient blobContainerClient = null; + mockBlobStorageProvider.Setup(p => p.TryCreateHostingBlobContainerClient(out blobContainerClient)).Returns(false); + var localRepository = new BlobStorageTargetScalerErrorRepository(_mockHostIdProvider.Object, _loggerFactory, mockBlobStorageProvider.Object); + + var container = await localRepository.GetContainerClientAsync(CancellationToken.None); + Assert.Null(container); + + // These should not throw — they handle null container gracefully + await localRepository.AddAsync("scaler-a", CancellationToken.None); + + var result = await localRepository.GetAsync(CancellationToken.None); + Assert.Empty(result); + } + + private async Task DeleteTestBlobsAsync() + { + BlobContainerClient blobContainerClient = await _repository.GetContainerClientAsync(CancellationToken.None); + var blobItems = blobContainerClient.GetBlobsByHierarchyAsync(prefix: $"scale/{TestHostId}"); + await foreach (var blob in blobItems) + { + BlobClient blobClient = blobContainerClient.GetBlobClient(blob.Blob.Name); + await blobClient.DeleteAsync(); + } + } + } +} diff --git a/test/Microsoft.Azure.WebJobs.Host.UnitTests/Scale/InMemoryTargetScalerErrorRepository.cs b/test/Microsoft.Azure.WebJobs.Host.UnitTests/Scale/InMemoryTargetScalerErrorRepository.cs new file mode 100644 index 000000000..719422efd --- /dev/null +++ b/test/Microsoft.Azure.WebJobs.Host.UnitTests/Scale/InMemoryTargetScalerErrorRepository.cs @@ -0,0 +1,33 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Azure.WebJobs.Host.Scale; + +namespace Microsoft.Azure.WebJobs.Host.UnitTests.Scale +{ + /// + /// In-memory implementation of for testing. + /// Stores state in a so tests can + /// simulate cross-worker communication using a shared instance. + /// + internal class InMemoryTargetScalerErrorRepository : ITargetScalerErrorRepository + { + private readonly ConcurrentDictionary _scalersInError = new ConcurrentDictionary(); + + public Task AddAsync(string scalerUniqueId, CancellationToken cancellationToken) + { + _scalersInError.TryAdd(scalerUniqueId, 0); + return Task.CompletedTask; + } + + public Task> GetAsync(CancellationToken cancellationToken) + { + ISet result = new HashSet(_scalersInError.Keys); + return Task.FromResult(result); + } + } +} diff --git a/test/Microsoft.Azure.WebJobs.Host.UnitTests/Scale/ScaleManagerTests.cs b/test/Microsoft.Azure.WebJobs.Host.UnitTests/Scale/ScaleManagerTests.cs index c21e5dd3e..fe79051c5 100644 --- a/test/Microsoft.Azure.WebJobs.Host.UnitTests/Scale/ScaleManagerTests.cs +++ b/test/Microsoft.Azure.WebJobs.Host.UnitTests/Scale/ScaleManagerTests.cs @@ -31,7 +31,7 @@ public class ScaleManagerTests private readonly IOptions _scaleOptions; private readonly IOptions _concurrencyOptions; private readonly IConfiguration _configuration; - private readonly HashSet _targetScalersInError; + private readonly ITargetScalerErrorRepository _targetScalerErrorRepository; public ScaleManagerTests() { @@ -72,7 +72,7 @@ public ScaleManagerTests() _configuration = new ConfigurationBuilder() .AddInMemoryCollection(new Dictionary { { "Microsoft.Azure.WebJobs.Host.UnitTests", "1" } }).Build(); - _targetScalersInError = new HashSet(); + _targetScalerErrorRepository = new InMemoryTargetScalerErrorRepository(); } [Theory] @@ -84,7 +84,7 @@ public async Task GetScaleStatus_NoMonitors_ReturnsExpectedStatus(int workerCoun { WorkerCount = workerCount }; - ScaleManager scaleManager = new ScaleManager(_monitorManagerMock.Object, _targetScalerManagerMock.Object, _metricsRepositoryMock.Object, _concurrencyStatusRepositoryMock.Object, _scaleOptions, _loggerFactory, _configuration); + ScaleManager scaleManager = new ScaleManager(_monitorManagerMock.Object, _targetScalerManagerMock.Object, _metricsRepositoryMock.Object, _concurrencyStatusRepositoryMock.Object, _targetScalerErrorRepository, _scaleOptions, _loggerFactory, _configuration); var status = await scaleManager.GetScaleStatusAsync(context); Assert.Equal(expected, status.Vote); @@ -152,7 +152,7 @@ public async Task GetScaleStatus_ReturnsExpectedResult(bool tbsEnabled) }); // Pass ConcurrencyOptions with DC enabled when TBS is enabled to exercise the concurrency snapshot path - ScaleManager scaleManager = new ScaleManager(_monitorManagerMock.Object, _targetScalerManagerMock.Object, _metricsRepositoryMock.Object, _concurrencyStatusRepositoryMock.Object, options, _loggerFactory, _configuration, tbsEnabled ? _concurrencyOptions : null); + ScaleManager scaleManager = new ScaleManager(_monitorManagerMock.Object, _targetScalerManagerMock.Object, _metricsRepositoryMock.Object, _concurrencyStatusRepositoryMock.Object, _targetScalerErrorRepository, options, _loggerFactory, _configuration, tbsEnabled ? _concurrencyOptions : null); var status = await scaleManager.GetScaleStatusAsync(context); @@ -212,7 +212,7 @@ public async Task GetScaleStatus_MonitorFails_ReturnsExpectedResult() }; _metricsRepositoryMock.Setup(p => p.ReadMetricsAsync(It.IsAny>())).ReturnsAsync(monitorMetrics); - ScaleManager scaleManager = new ScaleManager(_monitorManagerMock.Object, _targetScalerManagerMock.Object, _metricsRepositoryMock.Object, _concurrencyStatusRepositoryMock.Object, _scaleOptions, _loggerFactory, _configuration); + ScaleManager scaleManager = new ScaleManager(_monitorManagerMock.Object, _targetScalerManagerMock.Object, _metricsRepositoryMock.Object, _concurrencyStatusRepositoryMock.Object, _targetScalerErrorRepository, _scaleOptions, _loggerFactory, _configuration); var status = await scaleManager.GetScaleStatusAsync(context); var logs = _loggerProvider.GetAllLogMessages().ToArray(); @@ -251,7 +251,7 @@ public async Task GetScaleStatus_TargetScalerFails_ReturnsExpectedResult() IsTargetScalingEnabled = true, }); - ScaleManager scaleManager = new ScaleManager(_monitorManagerMock.Object, _targetScalerManagerMock.Object, _metricsRepositoryMock.Object, _concurrencyStatusRepositoryMock.Object, options, _loggerFactory, _configuration, _concurrencyOptions); + ScaleManager scaleManager = new ScaleManager(_monitorManagerMock.Object, _targetScalerManagerMock.Object, _metricsRepositoryMock.Object, _concurrencyStatusRepositoryMock.Object, _targetScalerErrorRepository, options, _loggerFactory, _configuration, _concurrencyOptions); var status = await scaleManager.GetScaleStatusAsync(context); @@ -297,7 +297,7 @@ public void GetAggregateScaleVote_ReturnsExpectedResult(int workerCount, int num [InlineData(false, true, 1, 0)] [InlineData(true, false, 1, 0)] [InlineData(true, true, 0, 1)] - public void GetScalersToSample_Returns_Expected(bool targetBaseScalingEnabled, bool triggerEnabled, int expectedScaleMonitorCount, int expectedTargetScalerCount) + public async Task GetScalersToSample_Returns_Expected(bool targetBaseScalingEnabled, bool triggerEnabled, int expectedScaleMonitorCount, int expectedTargetScalerCount) { List scaleMonitors = new List { @@ -325,11 +325,12 @@ public void GetScalersToSample_Returns_Expected(bool targetBaseScalingEnabled, b .AddInMemoryCollection(new Dictionary { { "Microsoft.Azure.WebJobs.Host.UnitTests", triggerEnabled ? "1" : "0" } }).Build(); - var (scaleMonitorsToProcess, targetScalesToProcess) = ScaleManager.GetScalersToSample( + var (scaleMonitorsToProcess, targetScalesToProcess) = await ScaleManager.GetScalersToSample( scaleMonitorManagerMock.Object, targetScalerManagerMock.Object, options, - configuration + configuration, + new NullTargetScalerErrorRepository() ); Assert.Equal(scaleMonitorsToProcess.Count(), expectedScaleMonitorCount); @@ -370,13 +371,14 @@ public async Task GetScalersToSample_FallsBackToMonitor_OnTargetScalerError() IsTargetScalingEnabled = true, }); - ScaleManager scaleManager = new ScaleManager(scaleMonitorManagerMock.Object, targetScalerManagerMock.Object, _metricsRepositoryMock.Object, _concurrencyStatusRepositoryMock.Object, options, _loggerFactory, _configuration); + ScaleManager scaleManager = new ScaleManager(scaleMonitorManagerMock.Object, targetScalerManagerMock.Object, _metricsRepositoryMock.Object, _concurrencyStatusRepositoryMock.Object, _targetScalerErrorRepository, options, _loggerFactory, _configuration); - var (monitors1, scalers1) = ScaleManager.GetScalersToSample( //Col1 + var (monitors1, scalers1) = await ScaleManager.GetScalersToSample( //Col1 scaleMonitorManagerMock.Object, targetScalerManagerMock.Object, _scaleOptions, - _configuration + _configuration, + new NullTargetScalerErrorRepository() ); Assert.Equal(monitors1.Count(), 0); Assert.Equal(scalers1.Count(), 2); @@ -388,11 +390,13 @@ public async Task GetScalersToSample_FallsBackToMonitor_OnTargetScalerError() Assert.Single(logs, x => x == "Function 'function1' error: Unable to use target based scaling, switching to metrics monitor."); _loggerProvider.ClearAllLogMessages(); - var (monitors2, scalers2) = ScaleManager.GetScalersToSample( + // After the error is recorded, subsequent calls should see the fallback + var (monitors2, scalers2) = await ScaleManager.GetScalersToSample( scaleMonitorManagerMock.Object, targetScalerManagerMock.Object, _scaleOptions, - _configuration + _configuration, + _targetScalerErrorRepository ); Assert.Equal(monitors2.Count(), 1); Assert.Equal(scalers2.Count(), 1); @@ -421,7 +425,7 @@ public async Task GetTargetScalersResult_DynamicConcurrencyDisabled_DoesNotReadC var concurrencyStatusRepositoryMock = new Mock(MockBehavior.Strict); // No setup for ReadAsync - if it is called the strict mock will throw - ScaleManager scaleManager = new ScaleManager(_monitorManagerMock.Object, _targetScalerManagerMock.Object, _metricsRepositoryMock.Object, concurrencyStatusRepositoryMock.Object, options, _loggerFactory, _configuration); + ScaleManager scaleManager = new ScaleManager(_monitorManagerMock.Object, _targetScalerManagerMock.Object, _metricsRepositoryMock.Object, concurrencyStatusRepositoryMock.Object, _targetScalerErrorRepository, options, _loggerFactory, _configuration); var status = await scaleManager.GetScaleStatusAsync(context); @@ -471,12 +475,128 @@ public async Task GetTargetScalersResult_ReadsConcurrencyStatus_OnlyWhenDCAndPer }); } - ScaleManager scaleManager = new ScaleManager(_monitorManagerMock.Object, _targetScalerManagerMock.Object, _metricsRepositoryMock.Object, concurrencyStatusRepositoryMock.Object, options, _loggerFactory, _configuration, concurrencyOptions); + ScaleManager scaleManager = new ScaleManager(_monitorManagerMock.Object, _targetScalerManagerMock.Object, _metricsRepositoryMock.Object, concurrencyStatusRepositoryMock.Object, _targetScalerErrorRepository, options, _loggerFactory, _configuration, concurrencyOptions); await scaleManager.GetScaleStatusAsync(context); var expectedCalls = expectReadCalled ? Times.Once() : Times.Never(); concurrencyStatusRepositoryMock.Verify(p => p.ReadAsync(It.IsAny()), expectedCalls); } + + [Fact] + public async Task GetScalersToSample_WithErrorSet_FiltersCorrectly() + { + var scaleMonitors = new List + { + new TestScaleMonitor("func1-test-test", "func1"), + new TestScaleMonitor("func2-test-test", "func2") + }; + var scaleMonitorManagerMock = new Mock(MockBehavior.Strict); + scaleMonitorManagerMock.Setup(x => x.GetMonitors()).Returns(scaleMonitors); + + var targetScalers = new List + { + new TestTargetScaler { TargetScalerDescriptor = new TargetScalerDescriptor("func1") }, + new TestTargetScaler { TargetScalerDescriptor = new TargetScalerDescriptor("func2") } + }; + var targetScalerManagerMock = new Mock(MockBehavior.Strict); + targetScalerManagerMock.Setup(x => x.GetTargetScalers()).Returns(targetScalers); + + var options = Options.Create(new ScaleOptions { IsTargetScalingEnabled = true }); + + // Build an error repository with func1's scaler ID + string func1ScalerId = $"{typeof(TestTargetScaler).Assembly.GetName().Name}-func1"; + var errorRepo = new InMemoryTargetScalerErrorRepository(); + await errorRepo.AddAsync(func1ScalerId, CancellationToken.None); + + var (monitors, scalers) = await ScaleManager.GetScalersToSample( + scaleMonitorManagerMock.Object, + targetScalerManagerMock.Object, + options, + _configuration, + errorRepo); + + // func1 should fall back to monitor, func2 should stay as target scaler + Assert.Single(monitors); + Assert.Equal("func1", monitors[0].Descriptor.FunctionId); + Assert.Single(scalers); + Assert.Equal("func2", scalers[0].TargetScalerDescriptor.FunctionId); + } + + [Fact] + public async Task GetScalersToSample_WithNullErrorSet_NoFiltering() + { + var scaleMonitors = new List + { + new TestScaleMonitor("func1-test-test", "func1") + }; + var scaleMonitorManagerMock = new Mock(MockBehavior.Strict); + scaleMonitorManagerMock.Setup(x => x.GetMonitors()).Returns(scaleMonitors); + + var targetScalers = new List + { + new TestTargetScaler { TargetScalerDescriptor = new TargetScalerDescriptor("func1") } + }; + var targetScalerManagerMock = new Mock(MockBehavior.Strict); + targetScalerManagerMock.Setup(x => x.GetTargetScalers()).Returns(targetScalers); + + var options = Options.Create(new ScaleOptions { IsTargetScalingEnabled = true }); + + // Pass NullTargetScalerErrorRepository — should behave like empty + var (monitors, scalers) = await ScaleManager.GetScalersToSample( + scaleMonitorManagerMock.Object, + targetScalerManagerMock.Object, + options, + _configuration, + new NullTargetScalerErrorRepository()); + + Assert.Empty(monitors); // target scaler takes precedence + Assert.Single(scalers); + } + + [Fact] + public async Task GetScaleStatusAsync_SecondCall_ReadsPersistedError() + { + // First call: target scaler throws → error persisted + // Second call: error read from repo → scaler skipped, no second throw + var repo = new InMemoryTargetScalerErrorRepository(); + + var targetScalers = new List + { + new FaultyTargetScaler { TargetScalerDescriptor = new TargetScalerDescriptor("func1") } + }; + _targetScalerManagerMock.Setup(p => p.GetTargetScalers()).Returns(targetScalers); + + var monitors = new List + { + new TestScaleMonitor("func1-test-test", "func1") + }; + _monitorManagerMock.Setup(p => p.GetMonitors()).Returns(monitors); + + var options = Options.Create(new ScaleOptions { IsTargetScalingEnabled = true }); + var scaleManager = new ScaleManager( + _monitorManagerMock.Object, + _targetScalerManagerMock.Object, + _metricsRepositoryMock.Object, + _concurrencyStatusRepositoryMock.Object, + repo, + options, + _loggerFactory, + _configuration); + + var context = new ScaleStatusContext { WorkerCount = 1 }; + + // First call — triggers the error + await scaleManager.GetScaleStatusAsync(context); + var logs = _loggerProvider.GetAllLogMessages().Select(x => x.FormattedMessage).ToArray(); + Assert.Single(logs, x => x.Contains("Unable to use target based scaling")); + _loggerProvider.ClearAllLogMessages(); + + // Second call — error is read from repo, scaler skipped, no throw + await scaleManager.GetScaleStatusAsync(context); + logs = _loggerProvider.GetAllLogMessages().Select(x => x.FormattedMessage).ToArray(); + // The "Unable to use target based scaling" should NOT appear again + Assert.DoesNotContain(logs, x => x.Contains("Unable to use target based scaling")); + } } }