Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

#nullable enable

using System;
using System.Collections.Generic;
using System.IO;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Azure;
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.Models;
using Microsoft.Azure.WebJobs.Host.Executors;
using Microsoft.Azure.WebJobs.Host.Scale;
using Microsoft.Azure.WebJobs.Host.Storage;
using Microsoft.Azure.WebJobs.Logging;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;

namespace Microsoft.Azure.WebJobs.Host
{
internal class BlobStorageTargetScalerErrorRepository : ITargetScalerErrorRepository
{
private readonly IHostIdProvider _hostIdProvider;
private readonly ILogger _logger;
private readonly IAzureBlobStorageProvider _blobStorageProvider;
private BlobContainerClient? _blobContainerClient;

private const int MaxRetries = 3;
internal static readonly TimeSpan DefaultTtl = TimeSpan.FromMinutes(10);

public BlobStorageTargetScalerErrorRepository(IHostIdProvider hostIdProvider, ILoggerFactory loggerFactory, IAzureBlobStorageProvider azureStorageProvider)
{
_hostIdProvider = hostIdProvider;
_logger = loggerFactory.CreateLogger(LogCategories.Scale);
_blobStorageProvider = azureStorageProvider;
}

public async Task AddAsync(string scalerUniqueId, CancellationToken cancellationToken)
Comment thread
alrod marked this conversation as resolved.
{
try
{
for (int attempt = 0; attempt < MaxRetries; attempt++)
{
// Read current state with ETag
var (state, etag) = await ReadBlobWithETagAsync(cancellationToken);
var set = state?.Scalers ?? new HashSet<string>();
set.Add(scalerUniqueId);

var newState = new TargetScalerErrorState
{
Scalers = set,
LastUpdated = DateTime.UtcNow
};

try
{
await WriteBlobAsync(newState, etag, cancellationToken);
return;
}
catch (RequestFailedException ex) when (ex.Status == 412 || ex.Status == 409)
{
// ETag mismatch — another instance wrote concurrently, retry
}
}

_logger.LogWarning("Failed to persist target scaler error state after {MaxRetries} attempts due to concurrent updates.", MaxRetries);
}
catch (Exception e)
{
_logger.LogError(e, "Error persisting target scaler error state.");
}
}

public async Task<ISet<string>> GetAsync(CancellationToken cancellationToken)
{
try
{
var (state, _) = await ReadBlobWithETagAsync(cancellationToken);
if (state?.LastUpdated != null && (DateTime.UtcNow - state.LastUpdated.Value) > DefaultTtl)
{
// Data is stale — treat as empty so target scalers are re-evaluated
return new HashSet<string>();
}
return state?.Scalers ?? new HashSet<string>();
}
catch (Exception e)
{
_logger.LogError(e, "Error reading target scaler error state.");
return new HashSet<string>();
}
}

private async Task<(TargetScalerErrorState?, ETag)> ReadBlobWithETagAsync(CancellationToken cancellationToken)
{
string blobPath = await GetBlobPathAsync(cancellationToken);

try
{
BlobContainerClient? containerClient = await GetContainerClientAsync(cancellationToken);
if (containerClient != null)
{
BlobClient blobClient = containerClient.GetBlobClient(blobPath);
var response = await blobClient.DownloadAsync(cancellationToken: cancellationToken);

string content;
using (StreamReader reader = new StreamReader(response.Value.Content, true))
{
content = reader.ReadToEnd();
}

if (!string.IsNullOrEmpty(content))
{
var state = JsonConvert.DeserializeObject<TargetScalerErrorState>(content);
return (state, response.Value.Details.ETag);
}
}
}
catch (RequestFailedException exception) when (exception.Status == 404)
{
// blob doesn't exist yet — no errors recorded
return (null, default);
}

return (null, default);
}

private async Task WriteBlobAsync(TargetScalerErrorState state, ETag etag, CancellationToken cancellationToken)
{
string blobPath = await GetBlobPathAsync(cancellationToken);
BlobContainerClient? containerClient = await GetContainerClientAsync(cancellationToken);
if (containerClient != null)
{
BlobClient blobClient = containerClient.GetBlobClient(blobPath);
var content = JsonConvert.SerializeObject(state);
using (Stream stream = new MemoryStream(Encoding.UTF8.GetBytes(content)))
{
var options = new BlobUploadOptions();
if (etag != default)
{
// Existing blob — only write if it hasn't changed since we read it
options.Conditions = new BlobRequestConditions { IfMatch = etag };
}
else
{
// No blob exists yet — only create if it still doesn't exist
options.Conditions = new BlobRequestConditions { IfNoneMatch = ETag.All };
}
await blobClient.UploadAsync(stream, options, cancellationToken);
}
}
}

internal async Task<BlobContainerClient?> GetContainerClientAsync(CancellationToken cancellationToken)
{
if (_blobContainerClient == null && _blobStorageProvider.TryCreateHostingBlobContainerClient(out _blobContainerClient))
{
await _blobContainerClient.CreateIfNotExistsAsync(cancellationToken: cancellationToken);
}

return _blobContainerClient;
}

internal async Task<string> GetBlobPathAsync(CancellationToken cancellationToken)
{
string hostId = await _hostIdProvider.GetHostIdAsync(cancellationToken);
return $"scale/{hostId}/targetScalersInError.json";
}

internal class TargetScalerErrorState
{
[JsonProperty("scalers")]
public HashSet<string> Scalers { get; set; } = new HashSet<string>();

[JsonProperty("lastUpdated")]
public DateTime? LastUpdated { get; set; }
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,15 @@ public static void AddAzureStorageCoreServices(this IServiceCollection services)
services.TryAddSingleton<IAzureBlobStorageProvider, AzureStorageProvider>();

services.AddSingleton<IConcurrencyStatusRepository, BlobStorageConcurrencyStatusRepository>();
services.AddSingleton<ITargetScalerErrorRepository, BlobStorageTargetScalerErrorRepository>();
}

public static void AddAzureStorageScaleServices(this IServiceCollection services)
{
services.TryAddEnumerable(ServiceDescriptor.Transient<IConfigureOptions<JobHostInternalStorageOptions>, CoreWebJobsOptionsSetup<JobHostInternalStorageOptions>>());
services.TryAddSingleton<IAzureBlobStorageProvider, AzureStorageProvider>();
services.AddSingleton<IConcurrencyStatusRepository, BlobStorageConcurrencyStatusRepository>();
services.AddSingleton<ITargetScalerErrorRepository, BlobStorageTargetScalerErrorRepository>();
}

// This is only called if the host didn't already provide an implementation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ public static IWebJobsBuilder AddWebJobs(this IServiceCollection services, Actio
services.AddOptionsLogging();
// Concurrency management
services.TryAddSingleton<IConcurrencyStatusRepository, NullConcurrencyStatusRepository>();

// Target scaler error state
services.TryAddSingleton<ITargetScalerErrorRepository, NullTargetScalerErrorRepository>();
services.TryAddSingleton<IHostProcessMonitor, DefaultHostProcessMonitor>();
services.TryAddSingleton<IConcurrencyThrottleManager, DefaultConcurrencyThrottleManager>();
services.TryAddEnumerable(ServiceDescriptor.Singleton<IConcurrencyThrottleProvider, HostHealthThrottleProvider>());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace Microsoft.Azure.WebJobs.Host.Scale
{
/// <summary>
/// Provides functionality for persisting target scaler errors across multiple host instances.
/// When a target scaler throws <see cref="System.NotSupportedException"/>, the scaler identifier
/// is recorded so all instances can fall back to incremental scale monitoring.
/// </summary>
internal interface ITargetScalerErrorRepository
{
/// <summary>
/// Adds a target scaler identifier to the set of scalers in error.
/// </summary>
/// <param name="scalerUniqueId">The unique identifier of the target scaler.</param>
/// <param name="cancellationToken">A cancellation token.</param>
/// <returns>A task that completes when the write is finished.</returns>
Task AddAsync(string scalerUniqueId, CancellationToken cancellationToken);

/// <summary>
/// Returns the set of target scaler identifiers currently in error.
/// </summary>
/// <param name="cancellationToken">A cancellation token.</param>
/// <returns>A task that returns the set of scaler identifiers in error.</returns>
Task<ISet<string>> GetAsync(CancellationToken cancellationToken);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace Microsoft.Azure.WebJobs.Host.Scale
{
internal class NullTargetScalerErrorRepository : ITargetScalerErrorRepository
{
public Task AddAsync(string scalerUniqueId, CancellationToken cancellationToken)
{
return Task.CompletedTask;
}

public Task<ISet<string>> GetAsync(CancellationToken cancellationToken)
{
ISet<string> result = new HashSet<string>();
return Task.FromResult(result);
}
}
}
20 changes: 10 additions & 10 deletions src/Microsoft.Azure.WebJobs.Host/Scale/ScaleManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,18 @@ internal class ScaleManager : IScaleStatusProvider
private readonly ITargetScalerManager _targetScalerManager;
private readonly IScaleMetricsRepository _metricsRepository;
private readonly IConcurrencyStatusRepository _concurrencyStatusRepository;
private readonly ITargetScalerErrorRepository _targetScalerErrorRepository;
private readonly ILogger _logger;
private readonly IConfiguration _configuration;
private IOptions<ScaleOptions> _scaleOptions;
private IOptions<ConcurrencyOptions> _concurrencyOptions;
private static HashSet<string> _targetScalersInError = new HashSet<string>();

public ScaleManager(
IScaleMonitorManager monitorManager,
ITargetScalerManager targetScalerManager,
IScaleMetricsRepository metricsRepository,
IConcurrencyStatusRepository concurrencyStatusRepository,
ITargetScalerErrorRepository targetScalerErrorRepository,
IOptions<ScaleOptions> scaleConfiguration,
ILoggerFactory loggerFactory,
IConfiguration configuration,
Expand All @@ -41,8 +42,8 @@ public ScaleManager(
_targetScalerManager = targetScalerManager;
_metricsRepository = metricsRepository;
_concurrencyStatusRepository = concurrencyStatusRepository;
_targetScalerErrorRepository = targetScalerErrorRepository;
_logger = loggerFactory?.CreateLogger<ScaleManager>();
_targetScalersInError = new HashSet<string>();
_scaleOptions = scaleConfiguration;
_configuration = configuration;
_concurrencyOptions = concurrencyOptions;
Expand All @@ -60,7 +61,7 @@ internal ScaleManager()
/// <returns>A task that returns the <see cref="AggregateScaleStatus"/>.</returns>
public async Task<AggregateScaleStatus> GetScaleStatusAsync(ScaleStatusContext context)
{
var (scaleMonitorsToProcess, targetScalersToProcess) = GetScalersToSample(_monitorManager, _targetScalerManager, _scaleOptions, _configuration);
var (scaleMonitorsToProcess, targetScalersToProcess) = await GetScalersToSample(_monitorManager, _targetScalerManager, _scaleOptions, _configuration, _targetScalerErrorRepository);

var scaleStatuses = await GetScaleMonitorsResultAsync(context, scaleMonitorsToProcess);
var targetScalerResults = await GetTargetScalersResultAsync(context, targetScalersToProcess);
Expand Down Expand Up @@ -181,10 +182,7 @@ private async Task<IDictionary<string, TargetScalerResult>> GetTargetScalersResu
targetScaler.TargetScalerDescriptor.FunctionId,
ex);

lock (_targetScalersInError)
{
_targetScalersInError.Add(targetScalerUniqueId);
}
await _targetScalerErrorRepository.AddAsync(targetScalerUniqueId, CancellationToken.None);

// Adding ScaleVote.None vote
result = new TargetScalerResult
Expand Down Expand Up @@ -212,11 +210,12 @@ private async Task<IDictionary<string, TargetScalerResult>> GetTargetScalersResu
/// Returns scale monitors and target scalers we want to use based on the configuration.
/// Scaler monitor will be ignored if a target scaler is defined in the same extensions assembly and TBS is enabled.
/// </summary>
internal static (List<IScaleMonitor>, List<ITargetScaler>) GetScalersToSample(
internal static async Task<(List<IScaleMonitor>, List<ITargetScaler>)> GetScalersToSample(
IScaleMonitorManager monitorManager,
ITargetScalerManager targetScalerManager,
IOptions<ScaleOptions> scaleOptions,
IConfiguration configuration)
IConfiguration configuration,
ITargetScalerErrorRepository targetScalerErrorRepository)
{
var scaleMonitors = monitorManager.GetMonitors();
var targetScalers = targetScalerManager.GetTargetScalers();
Expand All @@ -228,10 +227,11 @@ internal static (List<IScaleMonitor>, List<ITargetScaler>) GetScalersToSample(
if (scaleOptions.Value.IsTargetScalingEnabled)
{
HashSet<string> targetScalerFunctions = new HashSet<string>();
var errored = await targetScalerErrorRepository.GetAsync(CancellationToken.None);
foreach (var scaler in targetScalers)
{
string scalerUniqueId = GetTargetScalerFunctionUniqueId(scaler);
if (!_targetScalersInError.Contains(scalerUniqueId))
if (!errored.Contains(scalerUniqueId))
{
string assemblyName = GetAssemblyName(scaler.GetType());
bool featureDisabled = configuration.GetValue<string>(assemblyName) == "0";
Expand Down
7 changes: 5 additions & 2 deletions src/Microsoft.Azure.WebJobs.Host/Scale/ScaleMonitorService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ internal class ScaleMonitorService : IHostedService, IDisposable
private readonly IScaleMonitorManager _monitorManager;
private readonly ITargetScalerManager _targetScalerManager;
private readonly IConfiguration _configuration;
private readonly ITargetScalerErrorRepository _targetScalerErrorRepository;
private bool _disposed;

public ScaleMonitorService(
Expand All @@ -39,7 +40,8 @@ public ScaleMonitorService(
IScaleMonitorManager monitorManager,
ITargetScalerManager targetScalerManager,
IConfiguration configuration,
ILoggerFactory loggerFactory)
ILoggerFactory loggerFactory,
ITargetScalerErrorRepository targetScalerErrorRepository = null)
{
_scaleStausProvider = scaleStausProvider;
_metricsRepository = metricsRepository;
Expand All @@ -50,6 +52,7 @@ public ScaleMonitorService(
_monitorManager = monitorManager;
_targetScalerManager = targetScalerManager;
_configuration = configuration;
_targetScalerErrorRepository = targetScalerErrorRepository ?? new NullTargetScalerErrorRepository();
}

public Task StartAsync(CancellationToken cancellationToken)
Expand Down Expand Up @@ -92,7 +95,7 @@ private async Task TakeMetricsSamplesAsync()
{
try
{
var (scaleMonitorsToProcess, targetScalersToSample) = ScaleManager.GetScalersToSample(_monitorManager, _targetScalerManager, _scaleOptions, _configuration);
var (scaleMonitorsToProcess, targetScalersToSample) = await ScaleManager.GetScalersToSample(_monitorManager, _targetScalerManager, _scaleOptions, _configuration, _targetScalerErrorRepository);

if (scaleMonitorsToProcess.Any())
{
Expand Down
Loading