Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
13b8405
draft implementation
davidmrdavid May 2, 2023
62e988d
only instantiate necessary services
davidmrdavid May 8, 2023
aac6b0b
refactor
davidmrdavid May 10, 2023
c97113d
remove connectionName changes
davidmrdavid May 10, 2023
35d1fc8
add storage provider payload
davidmrdavid May 11, 2023
694af1c
Merge branch 'dajusto/tbs' of https://github.com/Azure/azure-function…
davidmrdavid May 11, 2023
4580fb7
remove unused import
davidmrdavid May 11, 2023
34cf4dc
add version suffix
davidmrdavid May 19, 2023
c69eb6e
update version suffix
davidmrdavid May 20, 2023
d2b963c
hack: use CreateLogger<T> over CreateLogger(<string>) because that's …
davidmrdavid May 23, 2023
f5be838
reorder statements
davidmrdavid May 23, 2023
e8a9f16
remove logging hack
davidmrdavid Sep 20, 2023
59e73e0
Remove logging message
davidmrdavid Sep 20, 2023
ab423bc
remove empty constructor
davidmrdavid Sep 20, 2023
847c3c6
remove unused imports
davidmrdavid Sep 20, 2023
a755660
incorporate some PR feedback
davidmrdavid Sep 21, 2023
4c4bc83
Merge branch 'dajusto/tbs' of https://github.com/Azure/azure-function…
davidmrdavid Sep 21, 2023
b6cf5d8
Update src/WebJobs.Extensions.DurableTask/DurableTaskJobHostConfigura…
davidmrdavid Sep 22, 2023
1c82e28
incorporate some PR feedback
davidmrdavid Sep 22, 2023
4b02438
add comments back
davidmrdavid Sep 22, 2023
5e7fc18
refactor all scaling utils to the static ScaleUtils class
davidmrdavid Sep 22, 2023
3800635
remove lazy initialization pattern
davidmrdavid Sep 25, 2023
becd05e
addressed some PR feedback
bachuv Oct 3, 2023
0533264
remove double error loggging
davidmrdavid Oct 5, 2023
1975b41
Address DI bug
davidmrdavid Oct 5, 2023
4a061d4
Update src/WebJobs.Extensions.DurableTask/Listener/DurableTaskTargetS…
davidmrdavid Oct 5, 2023
f5c6d3f
remove double logging of exception
davidmrdavid Oct 5, 2023
635aa75
add ScaleUtils unit tests
davidmrdavid Oct 5, 2023
3ad0f28
expand scaleUtils tests
davidmrdavid Oct 5, 2023
9380a70
InvalidOperationException changed for NotSupportedException
davidmrdavid Oct 5, 2023
7f47657
change ArithmeticException to InvalidOperationException in DurableTas…
davidmrdavid Oct 5, 2023
f29c14d
patch unit tests
davidmrdavid Oct 5, 2023
e15344b
add E2E scaler test
davidmrdavid Oct 6, 2023
e2595bf
removing unused imports
davidmrdavid Oct 6, 2023
c06b641
pass build errors
davidmrdavid Oct 6, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 6 additions & 125 deletions src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs
Original file line number Diff line number Diff line change
Expand Up @@ -150,13 +150,13 @@ public DurableTaskExtension(
this.nameResolver = nameResolver ?? throw new ArgumentNullException(nameof(nameResolver));
this.loggerFactory = loggerFactory ?? throw new ArgumentNullException(nameof(loggerFactory));
this.PlatformInformationService = platformInformationService ?? throw new ArgumentNullException(nameof(platformInformationService));
this.ResolveAppSettingOptions();
DurableTaskOptions.ResolveAppSettingOptions(this.Options, this.nameResolver);

Comment on lines 153 to 154
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

made this method static so it may be re-used in DurableTaskTriggersScaleProvider

ILogger logger = loggerFactory.CreateLogger(LoggerCategoryName);

this.TraceHelper = new EndToEndTraceHelper(logger, this.Options.Tracing.TraceReplayEvents);
this.LifeCycleNotificationHelper = lifeCycleNotificationHelper ?? this.CreateLifeCycleNotificationHelper();
this.durabilityProviderFactory = this.GetDurabilityProviderFactory(this.Options, logger, orchestrationServiceFactories);
this.durabilityProviderFactory = GetDurabilityProviderFactory(this.Options, logger, orchestrationServiceFactories);
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also made this static so it may be re-used in GetDurabilityProviderFactory.

this.defaultDurabilityProvider = this.durabilityProviderFactory.GetDurabilityProvider();
this.isOptionsConfigured = true;

Expand Down Expand Up @@ -249,6 +249,8 @@ public string HubName

internal DurableTaskOptions Options { get; }

internal DurabilityProvider DefaultDurabilityProvider => this.defaultDurabilityProvider;

internal HttpApiHandler HttpApiHandler { get; private set; }

internal ILifeCycleNotificationHelper LifeCycleNotificationHelper { get; private set; }
Expand Down Expand Up @@ -296,7 +298,7 @@ private MessagePayloadDataConverter CreateErrorDataConverter(IErrorSerializerSet
return new MessagePayloadDataConverter(errorSerializerSettingsFactory.CreateJsonSerializerSettings(), isDefault);
}

private IDurabilityProviderFactory GetDurabilityProviderFactory(DurableTaskOptions options, ILogger logger, IEnumerable<IDurabilityProviderFactory> orchestrationServiceFactories)
internal static IDurabilityProviderFactory GetDurabilityProviderFactory(DurableTaskOptions options, ILogger logger, IEnumerable<IDurabilityProviderFactory> orchestrationServiceFactories)
{
bool storageTypeIsConfigured = options.StorageProvider.TryGetValue("type", out object storageType);

Expand Down Expand Up @@ -578,32 +580,13 @@ private void StopLocalGrpcServer()
}
#endif

private void ResolveAppSettingOptions()
{
if (this.Options == null)
{
throw new InvalidOperationException($"{nameof(this.Options)} must be set before resolving app settings.");
}

if (this.nameResolver == null)
{
throw new InvalidOperationException($"{nameof(this.nameResolver)} must be set before resolving app settings.");
}

if (this.nameResolver.TryResolveWholeString(this.Options.HubName, out string taskHubName))
{
// use the resolved task hub name
this.Options.HubName = taskHubName;
}
}

private void InitializeForFunctionsV1(ExtensionConfigContext context)
{
#if FUNCTIONS_V1
context.ApplyConfig(this.Options, "DurableTask");
this.nameResolver = context.Config.NameResolver;
this.loggerFactory = context.Config.LoggerFactory;
this.ResolveAppSettingOptions();
DurableTaskOptions.ResolveAppSettingOptions(this.Options, this.nameResolver);
ILogger logger = this.loggerFactory.CreateLogger(LoggerCategoryName);
this.TraceHelper = new EndToEndTraceHelper(logger, this.Options.Tracing.TraceReplayEvents);
this.connectionInfoResolver = new WebJobsConnectionInfoProvider();
Expand Down Expand Up @@ -1573,108 +1556,6 @@ internal static void TagActivityWithOrchestrationStatus(OrchestrationRuntimeStat
activity.AddTag("DurableFunctionsRuntimeStatus", statusStr);
}
}

internal IScaleMonitor GetScaleMonitor(string functionId, FunctionName functionName, string connectionName)
{
if (this.defaultDurabilityProvider.TryGetScaleMonitor(
functionId,
functionName.Name,
this.Options.HubName,
connectionName,
out IScaleMonitor scaleMonitor))
{
return scaleMonitor;
}
else
{
// the durability provider does not support runtime scaling.
// Create an empty scale monitor to avoid exceptions (unless runtime scaling is actually turned on).
return new NoOpScaleMonitor($"{functionId}-DurableTaskTrigger-{this.Options.HubName}".ToLower(), functionId);
}
}
#endif
#if FUNCTIONS_V3_OR_GREATER

internal ITargetScaler GetTargetScaler(string functionId, FunctionName functionName, string connectionName)
{
if (this.defaultDurabilityProvider.TryGetTargetScaler(
functionId,
functionName.Name,
this.Options.HubName,
connectionName,
out ITargetScaler targetScaler))
{
return targetScaler;
}
else
{
// the durability provider does not support target-based scaling.
// Create an empty target scaler to avoid exceptions (unless target-based scaling is actually turned on).
return new NoOpTargetScaler(functionId);
}
}

private sealed class NoOpTargetScaler : ITargetScaler
{
/// <summary>
/// Construct a placeholder target scaler.
/// </summary>
/// <param name="functionId">The function ID.</param>
public NoOpTargetScaler(string functionId)
{
this.TargetScalerDescriptor = new TargetScalerDescriptor(functionId);
}

public TargetScalerDescriptor TargetScalerDescriptor { get; }

public Task<TargetScalerResult> GetScaleResultAsync(TargetScalerContext context)
{
throw new NotImplementedException("The current DurableTask backend configuration does not support target-based scaling");
}
}
#endif

#if !FUNCTIONS_V1
/// <summary>
/// A placeholder scale monitor, can be used by durability providers that do not support runtime scaling.
/// This is required to allow operation of those providers even if runtime scaling is turned off
/// see discussion https://github.com/Azure/azure-functions-durable-extension/pull/1009/files#r341767018.
/// </summary>
private sealed class NoOpScaleMonitor : IScaleMonitor
{
/// <summary>
/// Construct a placeholder scale monitor.
/// </summary>
/// <param name="name">A descriptive name.</param>
/// <param name="functionId">The function ID.</param>
public NoOpScaleMonitor(string name, string functionId)
{
#if FUNCTIONS_V3_OR_GREATER
this.Descriptor = new ScaleMonitorDescriptor(name, functionId);
#else
#pragma warning disable CS0618 // Type or member is obsolete
this.Descriptor = new ScaleMonitorDescriptor(name);
#pragma warning restore CS0618 // Type or member is obsolete
#endif
}

/// <summary>
/// A descriptive name.
/// </summary>
public ScaleMonitorDescriptor Descriptor { get; private set; }

/// <inheritdoc/>
Task<ScaleMetrics> IScaleMonitor.GetMetricsAsync()
{
throw new InvalidOperationException("The current DurableTask backend configuration does not support runtime scaling");
}

/// <inheritdoc/>
ScaleStatus IScaleMonitor.GetScaleStatus(ScaleStatusContext context)
{
throw new InvalidOperationException("The current DurableTask backend configuration does not support runtime scaling");
}
}
#endif
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,19 @@
// Licensed under the MIT License. See LICENSE in the project root for license information.

using System;
using System.Collections.Generic;
using System.Linq;
#if !FUNCTIONS_V1
using Microsoft.Azure.WebJobs.Extensions.DurableTask.Auth;
using Microsoft.Azure.WebJobs.Extensions.DurableTask.ContextImplementations;
using Microsoft.Azure.WebJobs.Extensions.DurableTask.Correlation;
using Microsoft.Azure.WebJobs.Extensions.DurableTask.Options;
using Microsoft.Azure.WebJobs.Extensions.DurableTask.Scale;
using Microsoft.Azure.WebJobs.Host.Scale;
using Microsoft.Extensions.Azure;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
#else
using Microsoft.Azure.WebJobs.Extensions.DurableTask.ContextImplementations;
Expand Down Expand Up @@ -107,6 +112,27 @@ public static IWebJobsBuilder AddDurableTask(this IWebJobsBuilder builder)
return builder;
}

#if FUNCTIONS_V3_OR_GREATER
/// <summary>
/// Adds the <see cref="IScaleMonitor"/> and <see cref="ITargetScaler"/> providers for the Durable Triggers.
/// </summary>
/// <param name="builder">The <see cref="IWebJobsBuilder"/> to configure.</param>
/// <returns>Returns the provided <see cref="IWebJobsBuilder"/>.</returns>
internal static IWebJobsBuilder AddDurableScaleForTrigger(this IWebJobsBuilder builder, TriggerMetadata triggerMetadata)
{
// this segment adheres to the followings pattern: https://github.com/Azure/azure-sdk-for-net/pull/38756
DurableTaskTriggersScaleProvider provider = null;
builder.Services.AddSingleton(serviceProvider =>
{
provider = new DurableTaskTriggersScaleProvider(serviceProvider.GetService<IOptions<DurableTaskOptions>>(), serviceProvider.GetService<INameResolver>(), serviceProvider.GetService<ILoggerFactory>(), serviceProvider.GetService<IEnumerable<IDurabilityProviderFactory>>(), triggerMetadata);
return provider;
});
builder.Services.AddSingleton<IScaleMonitorProvider>(serviceProvider => serviceProvider.GetServices<DurableTaskTriggersScaleProvider>().Single(x => x == provider));
builder.Services.AddSingleton<ITargetScalerProvider>(serviceProvider => serviceProvider.GetServices<DurableTaskTriggersScaleProvider>().Single(x => x == provider));
return builder;
}
#endif

/// <summary>
/// Adds the Durable Task extension to the provided <see cref="IWebJobsBuilder"/>.
/// </summary>
Expand Down
15 changes: 10 additions & 5 deletions src/WebJobs.Extensions.DurableTask/Listener/DurableTaskListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs.Extensions.DurableTask.Scale;
using Microsoft.Azure.WebJobs.Host.Listeners;
#if !FUNCTIONS_V1
using Microsoft.Azure.WebJobs.Host.Scale;
Expand Down Expand Up @@ -51,21 +52,25 @@ public DurableTaskListener(
this.functionName = functionName;
this.functionType = functionType;
this.connectionName = connectionName;
#if !FUNCTIONS_V1

#if !FUNCTIONS_V1
this.scaleMonitor = new Lazy<IScaleMonitor>(() =>
this.config.GetScaleMonitor(
ScaleUtils.GetScaleMonitor(
this.config.DefaultDurabilityProvider,
this.functionId,
this.functionName,
this.connectionName));
this.connectionName,
this.config.Options.HubName));

#endif
#if FUNCTIONS_V3_OR_GREATER
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In SC we have "FUNCTIONS_V3_OR_GREATER", right?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe so. SC has TFM .net6.0 and FUNCTIONS_V3_OR_GREATER targets netcoreapp3.1, which is "older" than .net6.0. So yes, I believe this code triggers in SC.

this.targetScaler = new Lazy<ITargetScaler>(() =>
this.config.GetTargetScaler(
ScaleUtils.GetTargetScaler(
this.config.DefaultDurabilityProvider,
this.functionId,
this.functionName,
this.connectionName));
this.connectionName,
this.config.Options.HubName));
#endif
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public DurableTaskScaleMonitor(
// We need this because the new ScaleMonitorDescriptor constructor is not compatible with the WebJobs version of Functions V1 and V2.
// Technically, it is also not available in Functions V3, but we don't have a TFM allowing us to differentiate between Functions V3 and V4.
this.scaleMonitorDescriptor = new ScaleMonitorDescriptor($"{this.functionId}-DurableTaskTrigger-{this.hubName}".ToLower());
#pragma warning restore CS0618 // Type or member is obsolete
#pragma warning restore CS0618 // Type or member is obsolete. However, the new interface is not compatible with Functions V2 and V1
#endif
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Licensed under the MIT License. See LICENSE in the project root for license information.

#if FUNCTIONS_V3_OR_GREATER
#nullable enable
using System;
using System.Collections.Generic;
using System.Linq;
Expand Down Expand Up @@ -38,46 +39,55 @@ public DurableTaskTargetScaler(string functionId, DurableTaskMetricsProvider met

public async Task<TargetScalerResult> GetScaleResultAsync(TargetScalerContext context)
{
// This method is only invoked by the ScaleController, so it doesn't run in the Functions Host process.
var metrics = await this.metricsProvider.GetMetricsAsync();

// compute activityWorkers: the number of workers we need to process all activity messages
var workItemQueueLength = metrics.WorkItemQueueLength;
double activityWorkers = Math.Ceiling(workItemQueueLength / (double)this.MaxConcurrentActivities);

var serializedControlQueueLengths = metrics.ControlQueueLengths;
var controlQueueLengths = JsonConvert.DeserializeObject<IReadOnlyList<int>>(serializedControlQueueLengths);

var controlQueueMessages = controlQueueLengths.Sum();
var activeControlQueues = controlQueueLengths.Count(x => x > 0);

// compute orchestratorWorkers: the number of workers we need to process all orchestrator messages.
// We bound this result to be no larger than the partition count
var upperBoundControlWorkers = Math.Ceiling(controlQueueMessages / (double)this.MaxConcurrentOrchestrators);
var orchestratorWorkers = Math.Min(activeControlQueues, upperBoundControlWorkers);

int numWorkersToRequest = (int)Math.Max(activityWorkers, orchestratorWorkers);
this.scaleResult.TargetWorkerCount = numWorkersToRequest;

// When running on ScaleController V3, ILogger logs are forwarded to the ScaleController's Kusto table.
// This works because this code does not execute in the Functions Host process, but in the ScaleController process,
// and the ScaleController is injecting it's own custom ILogger implementation that forwards logs to Kusto.
var scaleControllerLog = $"Target worker count for {this.functionId}: {numWorkersToRequest}. " +
$"Metrics used: workItemQueueLength={workItemQueueLength}. controlQueueLengths={serializedControlQueueLengths}. " +
$"maxConcurrentOrchestrators={this.MaxConcurrentOrchestrators}. maxConcurrentActivities={this.MaxConcurrentActivities}";

// target worker count should never be negative
if (numWorkersToRequest < 0)
DurableTaskTriggerMetrics? metrics = null;
Copy link
Copy Markdown
Member

@alrod alrod Sep 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To fall back to regular scale for Netherite and MSSQL you need to throw NonSupportedException:

azure-sdk-for-net/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Listeners/ServiceBusTargetScaler.cs at main · Azure/azure-sdk-for-net (github.com)

Do you have this implemented for here?
Can we add a test for this?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated the exception here.

Yes, we can add a test.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was incorporated in my latest commit

try
{
scaleControllerLog = "Tried to request a negative worker count." + scaleControllerLog;
this.logger.LogError(scaleControllerLog);

// Throw exception so ScaleController can handle the error.
throw new Exception(scaleControllerLog);
// This method is only invoked by the ScaleController, so it doesn't run in the Functions Host process.
metrics = await this.metricsProvider.GetMetricsAsync();

// compute activityWorkers: the number of workers we need to process all activity messages
var workItemQueueLength = metrics.WorkItemQueueLength;
double activityWorkers = Math.Ceiling(workItemQueueLength / (double)this.MaxConcurrentActivities);

var serializedControlQueueLengths = metrics.ControlQueueLengths;
var controlQueueLengths = JsonConvert.DeserializeObject<IReadOnlyList<int>>(serializedControlQueueLengths);

var controlQueueMessages = controlQueueLengths.Sum();
var activeControlQueues = controlQueueLengths.Count(x => x > 0);

// compute orchestratorWorkers: the number of workers we need to process all orchestrator messages.
// We bound this result to be no larger than the partition count
var upperBoundControlWorkers = Math.Ceiling(controlQueueMessages / (double)this.MaxConcurrentOrchestrators);
var orchestratorWorkers = Math.Min(activeControlQueues, upperBoundControlWorkers);

int numWorkersToRequest = (int)Math.Max(activityWorkers, orchestratorWorkers);
this.scaleResult.TargetWorkerCount = numWorkersToRequest;

// When running on ScaleController V3, ILogger logs are forwarded to the ScaleController's Kusto table.
// This works because this code does not execute in the Functions Host process, but in the ScaleController process,
// and the ScaleController is injecting it's own custom ILogger implementation that forwards logs to Kusto.
var metricsLog = $"Metrics: workItemQueueLength={workItemQueueLength}. controlQueueLengths={serializedControlQueueLengths}. " +
$"maxConcurrentOrchestrators={this.MaxConcurrentOrchestrators}. maxConcurrentActivities={this.MaxConcurrentActivities}";
var scaleControllerLog = $"Target worker count for '{this.functionId}' is '{numWorkersToRequest}'. " +
metricsLog;

// target worker count should never be negative
if (numWorkersToRequest < 0)
{
throw new InvalidOperationException("Number of workers to request cannot be negative");
}

this.logger.LogInformation(scaleControllerLog);
return this.scaleResult;
}
catch (Exception ex)
{
// We want to augment the exception with metrics information for investigation purposes
var metricsLog = $"Metrics: workItemQueueLength={metrics?.WorkItemQueueLength}. controlQueueLengths={metrics?.ControlQueueLengths}. " +
$"maxConcurrentOrchestrators={this.MaxConcurrentOrchestrators}. maxConcurrentActivities={this.MaxConcurrentActivities}";
var errorLog = $"Error: target worker count for '{this.functionId}' resulted in exception. " + metricsLog;
throw new Exception(errorLog, ex);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we're effectively double-logging the exception here my including the same exception information in both errorLog and as an inner exception ex. This can be confusing for someone trying to debug scale controller logs.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, I think I was being overly verbose here. I've updated the code to remove the string representation of the error and instead just rely on the "innerException" parameter: f5c6d3f

}

this.logger.LogDebug(scaleControllerLog);
return this.scaleResult;
}
}
}
Expand Down
Loading