Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
9faa500
initial commit
bachuv Apr 19, 2023
64a5484
first draft TBS
davidmrdavid Apr 21, 2023
9e56855
add no op scaler
davidmrdavid Apr 21, 2023
cd2ebe0
fix identation of dependency in csproj
davidmrdavid Apr 21, 2023
841be56
add error message to no-op target scaler
davidmrdavid Apr 21, 2023
adbe80a
add private build suffix
davidmrdavid Apr 21, 2023
5f79abe
change preview suffix
davidmrdavid Apr 21, 2023
7bc0077
introduce more conditional compilation to pass smoke tests
davidmrdavid Apr 25, 2023
f534ce5
pass stylecop
davidmrdavid Apr 25, 2023
0ad2d8e
add conditional compilation
davidmrdavid Apr 25, 2023
dc2c71a
patch conditional compilation exceptions
davidmrdavid Apr 25, 2023
706aa00
add unit test
davidmrdavid Apr 28, 2023
39b29d7
Merge branch 'main' of https://github.com/Azure/azure-functions-durab…
davidmrdavid Apr 28, 2023
78f9490
Merge branch 'dev' of https://github.com/Azure/azure-functions-durabl…
davidmrdavid Apr 28, 2023
70b6ebe
add comments
davidmrdavid Apr 28, 2023
bac5605
add unit tests
davidmrdavid May 1, 2023
776e9b6
Update src/WebJobs.Extensions.DurableTask/AzureStorageDurabilityProvi…
davidmrdavid May 11, 2023
22541c2
add logs and comments
davidmrdavid May 11, 2023
d94fa7f
Remove extra line
davidmrdavid May 11, 2023
5da5ec4
Merge branch 'dev' into dajusto/tbs
davidmrdavid Sep 20, 2023
4490d11
incorporate PR feedback
davidmrdavid Sep 21, 2023
c192695
remove DTFx.Listener improts
davidmrdavid Sep 21, 2023
2e6a317
remove old GetPerformanceMonitor implementation
davidmrdavid Sep 21, 2023
e251a81
pass stylecop
davidmrdavid Sep 21, 2023
7912e7a
add comments to explain when code runs in the ScaleController process
davidmrdavid Sep 21, 2023
8d03b9a
pass stylecop
davidmrdavid Sep 21, 2023
731d809
Add ScaleController V3 integration (#2462)
davidmrdavid Oct 6, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using DurableTask.AzureStorage.Tracking;
using DurableTask.Core;
using Microsoft.Extensions.Logging;
using Microsoft.WindowsAzure.Storage;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the target-based scaling support still based on the old deprecated AS backend?

using Newtonsoft.Json;
using Newtonsoft.Json.Converters;
using Newtonsoft.Json.Linq;
Expand Down Expand Up @@ -200,6 +201,16 @@ internal static OrchestrationInstanceStatusQueryCondition ConvertWebjobsDurableC
}

#if !FUNCTIONS_V1

internal DurableTaskMetricsProvider GetMetricsProvider(
string functionName,
string hubName,
CloudStorageAccount storageAccount,
ILogger logger)
{
return new DurableTaskMetricsProvider(functionName, hubName, logger, performanceMonitor: null, storageAccount);
}

/// <inheritdoc/>
public override bool TryGetScaleMonitor(
string functionId,
Expand All @@ -208,12 +219,31 @@ public override bool TryGetScaleMonitor(
string connectionName,
out IScaleMonitor scaleMonitor)
{
CloudStorageAccount storageAccount = this.storageAccountProvider.GetStorageAccountDetails(connectionName).ToCloudStorageAccount();
DurableTaskMetricsProvider metricsProvider = this.GetMetricsProvider(functionName, hubName, storageAccount, this.logger);
scaleMonitor = new DurableTaskScaleMonitor(
functionId,
functionName,
hubName,
this.storageAccountProvider.GetStorageAccountDetails(connectionName).ToCloudStorageAccount(),
this.logger);
storageAccount,
this.logger,
metricsProvider);
return true;
}

#endif
#if FUNCTIONS_V3_OR_GREATER
public override bool TryGetTargetScaler(
string functionId,
string functionName,
string hubName,
string connectionName,
out ITargetScaler targetScaler)
{
// This is only called by the ScaleController, it doesn't run in the Functions Host process.
CloudStorageAccount storageAccount = this.storageAccountProvider.GetStorageAccountDetails(connectionName).ToCloudStorageAccount();
DurableTaskMetricsProvider metricsProvider = this.GetMetricsProvider(functionName, hubName, storageAccount, this.logger);
targetScaler = new DurableTaskTargetScaler(functionId, metricsProvider, this, this.logger);
return true;
}
#endif
Expand Down
22 changes: 22 additions & 0 deletions src/WebJobs.Extensions.DurableTask/DurabilityProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -565,5 +565,27 @@ public virtual bool TryGetScaleMonitor(
return false;
}
#endif

#if FUNCTIONS_V3_OR_GREATER
/// <summary>
/// Tries to obtain a scaler for target based scaling.
/// </summary>
/// <param name="functionId">Function id.</param>
/// <param name="functionName">Function name.</param>
/// <param name="hubName">Task hub name.</param>
/// <param name="connectionName">The name of the storage-specific connection settings.</param>
/// <param name="targetScaler">The target-based scaler.</param>
/// <returns>True if target-based scaling is supported, false otherwise.</returns>
public virtual bool TryGetTargetScaler(
string functionId,
string functionName,
string hubName,
string connectionName,
out ITargetScaler targetScaler)
{
targetScaler = null;
return false;
}
#endif
}
}
82 changes: 6 additions & 76 deletions src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs
Original file line number Diff line number Diff line change
Expand Up @@ -150,13 +150,13 @@ public DurableTaskExtension(
this.nameResolver = nameResolver ?? throw new ArgumentNullException(nameof(nameResolver));
this.loggerFactory = loggerFactory ?? throw new ArgumentNullException(nameof(loggerFactory));
this.PlatformInformationService = platformInformationService ?? throw new ArgumentNullException(nameof(platformInformationService));
this.ResolveAppSettingOptions();
DurableTaskOptions.ResolveAppSettingOptions(this.Options, this.nameResolver);

ILogger logger = loggerFactory.CreateLogger(LoggerCategoryName);

this.TraceHelper = new EndToEndTraceHelper(logger, this.Options.Tracing.TraceReplayEvents);
this.LifeCycleNotificationHelper = lifeCycleNotificationHelper ?? this.CreateLifeCycleNotificationHelper();
this.durabilityProviderFactory = this.GetDurabilityProviderFactory(this.Options, logger, orchestrationServiceFactories);
this.durabilityProviderFactory = GetDurabilityProviderFactory(this.Options, logger, orchestrationServiceFactories);
this.defaultDurabilityProvider = this.durabilityProviderFactory.GetDurabilityProvider();
this.isOptionsConfigured = true;

Expand Down Expand Up @@ -249,6 +249,8 @@ public string HubName

internal DurableTaskOptions Options { get; }

internal DurabilityProvider DefaultDurabilityProvider => this.defaultDurabilityProvider;

internal HttpApiHandler HttpApiHandler { get; private set; }

internal ILifeCycleNotificationHelper LifeCycleNotificationHelper { get; private set; }
Expand Down Expand Up @@ -296,7 +298,7 @@ private MessagePayloadDataConverter CreateErrorDataConverter(IErrorSerializerSet
return new MessagePayloadDataConverter(errorSerializerSettingsFactory.CreateJsonSerializerSettings(), isDefault);
}

private IDurabilityProviderFactory GetDurabilityProviderFactory(DurableTaskOptions options, ILogger logger, IEnumerable<IDurabilityProviderFactory> orchestrationServiceFactories)
internal static IDurabilityProviderFactory GetDurabilityProviderFactory(DurableTaskOptions options, ILogger logger, IEnumerable<IDurabilityProviderFactory> orchestrationServiceFactories)
{
bool storageTypeIsConfigured = options.StorageProvider.TryGetValue("type", out object storageType);

Expand Down Expand Up @@ -578,32 +580,13 @@ private void StopLocalGrpcServer()
}
#endif

private void ResolveAppSettingOptions()
{
if (this.Options == null)
{
throw new InvalidOperationException($"{nameof(this.Options)} must be set before resolving app settings.");
}

if (this.nameResolver == null)
{
throw new InvalidOperationException($"{nameof(this.nameResolver)} must be set before resolving app settings.");
}

if (this.nameResolver.TryResolveWholeString(this.Options.HubName, out string taskHubName))
{
// use the resolved task hub name
this.Options.HubName = taskHubName;
}
}

private void InitializeForFunctionsV1(ExtensionConfigContext context)
{
#if FUNCTIONS_V1
context.ApplyConfig(this.Options, "DurableTask");
this.nameResolver = context.Config.NameResolver;
this.loggerFactory = context.Config.LoggerFactory;
this.ResolveAppSettingOptions();
DurableTaskOptions.ResolveAppSettingOptions(this.Options, this.nameResolver);
ILogger logger = this.loggerFactory.CreateLogger(LoggerCategoryName);
this.TraceHelper = new EndToEndTraceHelper(logger, this.Options.Tracing.TraceReplayEvents);
this.connectionInfoResolver = new WebJobsConnectionInfoProvider();
Expand Down Expand Up @@ -1573,59 +1556,6 @@ internal static void TagActivityWithOrchestrationStatus(OrchestrationRuntimeStat
activity.AddTag("DurableFunctionsRuntimeStatus", statusStr);
}
}

internal IScaleMonitor GetScaleMonitor(string functionId, FunctionName functionName, string connectionName)
{
if (this.defaultDurabilityProvider.TryGetScaleMonitor(
functionId,
functionName.Name,
this.Options.HubName,
connectionName,
out IScaleMonitor scaleMonitor))
{
return scaleMonitor;
}
else
{
// the durability provider does not support runtime scaling.
// Create an empty scale monitor to avoid exceptions (unless runtime scaling is actually turned on).
return new NoOpScaleMonitor($"{functionId}-DurableTaskTrigger-{this.Options.HubName}".ToLower());
}
}

/// <summary>
/// A placeholder scale monitor, can be used by durability providers that do not support runtime scaling.
/// This is required to allow operation of those providers even if runtime scaling is turned off
/// see discussion https://github.com/Azure/azure-functions-durable-extension/pull/1009/files#r341767018.
/// </summary>
private sealed class NoOpScaleMonitor : IScaleMonitor
{
/// <summary>
/// Construct a placeholder scale monitor.
/// </summary>
/// <param name="name">A descriptive name.</param>
public NoOpScaleMonitor(string name)
{
this.Descriptor = new ScaleMonitorDescriptor(name);
}

/// <summary>
/// A descriptive name.
/// </summary>
public ScaleMonitorDescriptor Descriptor { get; private set; }

/// <inheritdoc/>
Task<ScaleMetrics> IScaleMonitor.GetMetricsAsync()
{
throw new InvalidOperationException("The current DurableTask backend configuration does not support runtime scaling");
}

/// <inheritdoc/>
ScaleStatus IScaleMonitor.GetScaleStatus(ScaleStatusContext context)
{
throw new InvalidOperationException("The current DurableTask backend configuration does not support runtime scaling");
}
}
#endif
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,19 @@
// Licensed under the MIT License. See LICENSE in the project root for license information.

using System;
using System.Net.Http;
using System.Threading;
using System.Collections.Generic;
using System.Linq;
#if !FUNCTIONS_V1
using Microsoft.Azure.WebJobs.Extensions.DurableTask.Auth;
using Microsoft.Azure.WebJobs.Extensions.DurableTask.ContextImplementations;
using Microsoft.Azure.WebJobs.Extensions.DurableTask.Correlation;
using Microsoft.Azure.WebJobs.Extensions.DurableTask.Options;
using Microsoft.Azure.WebJobs.Extensions.DurableTask.Scale;
using Microsoft.Azure.WebJobs.Host.Scale;
Comment thread
davidmrdavid marked this conversation as resolved.
using Microsoft.Extensions.Azure;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
#else
using Microsoft.Azure.WebJobs.Extensions.DurableTask.ContextImplementations;
Expand Down Expand Up @@ -109,6 +112,27 @@ public static IWebJobsBuilder AddDurableTask(this IWebJobsBuilder builder)
return builder;
}

#if FUNCTIONS_V3_OR_GREATER
/// <summary>
/// Adds the <see cref="IScaleMonitor"/> and <see cref="ITargetScaler"/> providers for the Durable Triggers.
/// </summary>
/// <param name="builder">The <see cref="IWebJobsBuilder"/> to configure.</param>
/// <returns>Returns the provided <see cref="IWebJobsBuilder"/>.</returns>
internal static IWebJobsBuilder AddDurableScaleForTrigger(this IWebJobsBuilder builder, TriggerMetadata triggerMetadata)
{
// this segment adheres to the followings pattern: https://github.com/Azure/azure-sdk-for-net/pull/38756
DurableTaskTriggersScaleProvider provider = null;
builder.Services.AddSingleton(serviceProvider =>
{
provider = new DurableTaskTriggersScaleProvider(serviceProvider.GetService<IOptions<DurableTaskOptions>>(), serviceProvider.GetService<INameResolver>(), serviceProvider.GetService<ILoggerFactory>(), serviceProvider.GetService<IEnumerable<IDurabilityProviderFactory>>(), triggerMetadata);
return provider;
});
builder.Services.AddSingleton<IScaleMonitorProvider>(serviceProvider => serviceProvider.GetServices<DurableTaskTriggersScaleProvider>().Single(x => x == provider));
builder.Services.AddSingleton<ITargetScalerProvider>(serviceProvider => serviceProvider.GetServices<DurableTaskTriggersScaleProvider>().Single(x => x == provider));
return builder;
}
#endif

/// <summary>
/// Adds the Durable Task extension to the provided <see cref="IWebJobsBuilder"/>.
/// </summary>
Expand Down
38 changes: 31 additions & 7 deletions src/WebJobs.Extensions.DurableTask/Listener/DurableTaskListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,19 @@
// Licensed under the MIT License. See LICENSE in the project root for license information.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using DurableTask.AzureStorage.Monitoring;
using Microsoft.Azure.WebJobs.Host.Executors;
using Microsoft.Azure.WebJobs.Extensions.DurableTask.Scale;
using Microsoft.Azure.WebJobs.Host.Listeners;
#if !FUNCTIONS_V1
using Microsoft.Azure.WebJobs.Host.Scale;
#endif

namespace Microsoft.Azure.WebJobs.Extensions.DurableTask
{
#if !FUNCTIONS_V1
#if FUNCTIONS_V3_OR_GREATER
internal sealed class DurableTaskListener : IListener, IScaleMonitorProvider, ITargetScalerProvider
#elif FUNCTIONS_V2_OR_GREATER
internal sealed class DurableTaskListener : IListener, IScaleMonitorProvider
#else
internal sealed class DurableTaskListener : IListener
Expand All @@ -26,10 +25,15 @@ internal sealed class DurableTaskListener : IListener
private readonly FunctionName functionName;
private readonly FunctionType functionType;
private readonly string connectionName;

#if !FUNCTIONS_V1
private readonly Lazy<IScaleMonitor> scaleMonitor;
#endif

#if FUNCTIONS_V3_OR_GREATER
private readonly Lazy<ITargetScaler> targetScaler;
#endif

public DurableTaskListener(
DurableTaskExtension config,
string functionId,
Expand All @@ -48,12 +52,25 @@ public DurableTaskListener(
this.functionName = functionName;
this.functionType = functionType;
this.connectionName = connectionName;

#if !FUNCTIONS_V1
this.scaleMonitor = new Lazy<IScaleMonitor>(() =>
this.config.GetScaleMonitor(
ScaleUtils.GetScaleMonitor(
this.config.DefaultDurabilityProvider,
this.functionId,
this.functionName,
this.connectionName));
this.connectionName,
this.config.Options.HubName));

#endif
#if FUNCTIONS_V3_OR_GREATER
this.targetScaler = new Lazy<ITargetScaler>(() =>
ScaleUtils.GetTargetScaler(
this.config.DefaultDurabilityProvider,
this.functionId,
this.functionName,
this.connectionName,
this.config.Options.HubName));
#endif
}

Expand Down Expand Up @@ -98,5 +115,12 @@ public IScaleMonitor GetMonitor()
return this.scaleMonitor.Value;
}
#endif

#if FUNCTIONS_V3_OR_GREATER
public ITargetScaler GetTargetScaler()
{
return this.targetScaler.Value;
}
#endif
}
}
Loading