Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
using Microsoft.Azure.WebJobs.Host.Executors;
using Microsoft.Azure.WebJobs.Host.Scale;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json.Linq;
using StackExchange.Redis;
using System;
using System.Linq;
Expand All @@ -22,8 +20,7 @@ public RedisListListener(string name, string connectionString, string key, TimeS
{
this.listPopFromBeginning = listPopFromBeginning;
this.logPrefix = $"[Name:{name}][Trigger:RedisListTrigger][Key:{key}]";
this.Descriptor = new ScaleMonitorDescriptor(name, $"{name}-RedisListTrigger-{key}");
this.TargetScalerDescriptor = new TargetScalerDescriptor($"{name}-RedisListTrigger-{key}");
this.scaleMonitor = new RedisListTriggerScaleMonitor(multiplexer, name, maxBatchSize, key);
}

public override void BeforePolling()
Expand Down Expand Up @@ -69,16 +66,5 @@ private Task ExecuteAsync(RedisValue value, CancellationToken cancellationToken)
{
return executor.TryExecuteAsync(new TriggeredFunctionData() { TriggerValue = value }, cancellationToken);
}

public override Task<RedisPollingTriggerBaseMetrics> GetMetricsAsync()
{
var metrics = new RedisPollingTriggerBaseMetrics
{
Remaining = multiplexer.GetDatabase().ListLength(key),
Timestamp = DateTime.UtcNow,
};

return Task.FromResult(metrics);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
using Microsoft.Azure.WebJobs.Host.Scale;
using StackExchange.Redis;
using System;
using System.Threading.Tasks;

namespace Microsoft.Azure.WebJobs.Extensions.Redis
{
internal class RedisListTriggerScaleMonitor : RedisPollingTriggerBaseScaleMonitor
{
public RedisListTriggerScaleMonitor(IConnectionMultiplexer multiplexer,
string name,
int maxBatchSize,
string key)
: base(multiplexer, maxBatchSize, key)
{
this.Descriptor = new ScaleMonitorDescriptor(name, $"{name}-RedisListTrigger-{key}");
this.TargetScalerDescriptor = new TargetScalerDescriptor($"{name}-RedisListTrigger-{key}");
}

public override Task<RedisPollingTriggerBaseMetrics> GetMetricsAsync()
{
var metrics = new RedisPollingTriggerBaseMetrics
{
Remaining = multiplexer.GetDatabase().ListLength(key),
Timestamp = DateTime.UtcNow,
};

return Task.FromResult(metrics);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
using Microsoft.Extensions.Logging;
using StackExchange.Redis;
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

Expand All @@ -14,9 +13,8 @@ namespace Microsoft.Azure.WebJobs.Extensions.Redis
/// <summary>
/// Responsible for polling a cache.
/// </summary>
internal abstract class RedisPollingTriggerBaseListener : IListener, IScaleMonitor, IScaleMonitor<RedisPollingTriggerBaseMetrics>, ITargetScaler
internal abstract class RedisPollingTriggerBaseListener : IListener, IScaleMonitorProvider, ITargetScalerProvider
{
private const int MINIMUM_SAMPLES = 5;
internal string name;
internal string connectionString;
internal string key;
Expand All @@ -26,11 +24,9 @@ internal abstract class RedisPollingTriggerBaseListener : IListener, IScaleMonit
internal ILogger logger;

internal string logPrefix;
public ScaleMonitorDescriptor Descriptor { get; internal set; }
public TargetScalerDescriptor TargetScalerDescriptor { get; internal set; }

internal IConnectionMultiplexer multiplexer;
internal Version serverVersion;
internal RedisPollingTriggerBaseScaleMonitor scaleMonitor;

public RedisPollingTriggerBaseListener(string name, string connectionString, string key, TimeSpan pollingInterval, int maxBatchSize, ITriggeredFunctionExecutor executor, ILogger logger)
{
Expand Down Expand Up @@ -98,7 +94,7 @@ public virtual void BeforePolling() { }
public abstract Task PollAsync(CancellationToken cancellationToken);

/// <summary>
/// Any Redis commands necessart to run before the connection is terminated.
/// Any Redis commands necessary to run before the connection is terminated.
/// </summary>
public virtual void BeforeClosing() { }

Expand All @@ -114,50 +110,14 @@ private async Task Loop(CancellationToken cancellationToken)
}
}

async Task<ScaleMetrics> IScaleMonitor.GetMetricsAsync()
{
return await this.GetMetricsAsync().ConfigureAwait(false);
}

public abstract Task<RedisPollingTriggerBaseMetrics> GetMetricsAsync();

public ScaleStatus GetScaleStatus(ScaleStatusContext<RedisPollingTriggerBaseMetrics> context)
{
return GetScaleStatusCore(context.WorkerCount, context.Metrics?.ToArray());
}

public ScaleStatus GetScaleStatus(ScaleStatusContext context)
{
return GetScaleStatusCore(context.WorkerCount, context.Metrics?.Cast<RedisPollingTriggerBaseMetrics>().ToArray());
}

private ScaleStatus GetScaleStatusCore(int workerCount, RedisPollingTriggerBaseMetrics[] metrics)
public IScaleMonitor GetMonitor()
{
// don't scale up or down if we don't have enough metrics
if (metrics is null || metrics.Length < MINIMUM_SAMPLES)
{
return new ScaleStatus { Vote = ScaleVote.None };
}

double average = metrics.OrderByDescending(metric => metric.Timestamp).Take(MINIMUM_SAMPLES).Select(metric => metric.Remaining).Average();

if (workerCount * maxBatchSize < average)
{
return new ScaleStatus { Vote = ScaleVote.ScaleOut };
}

if ((workerCount - 1) * maxBatchSize > average)
{
return new ScaleStatus { Vote = ScaleVote.ScaleIn };
}

return new ScaleStatus { Vote = ScaleVote.None };
return scaleMonitor;
}

public async Task<TargetScalerResult> GetScaleResultAsync(TargetScalerContext context)
public ITargetScaler GetTargetScaler()
{
RedisPollingTriggerBaseMetrics metric = await GetMetricsAsync();
return new TargetScalerResult() { TargetWorkerCount = (int)Math.Ceiling(metric.Remaining / (decimal)maxBatchSize) };
return scaleMonitor;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
using Microsoft.Azure.WebJobs.Host.Scale;
using StackExchange.Redis;
using System;
using System.Linq;
using System.Threading.Tasks;

namespace Microsoft.Azure.WebJobs.Extensions.Redis
{
internal abstract class RedisPollingTriggerBaseScaleMonitor: IScaleMonitor, IScaleMonitor<RedisPollingTriggerBaseMetrics>, ITargetScaler
{
private const int MINIMUM_SAMPLES = 5;

internal IConnectionMultiplexer multiplexer;
internal int maxBatchSize;
internal string key;

public RedisPollingTriggerBaseScaleMonitor(IConnectionMultiplexer multiplexer, int maxBatchSize, string key)
{
this.multiplexer = multiplexer;
this.maxBatchSize = maxBatchSize;
this.key = key;
}

public ScaleMonitorDescriptor Descriptor { get; internal set; }
public TargetScalerDescriptor TargetScalerDescriptor { get; internal set; }

async Task<ScaleMetrics> IScaleMonitor.GetMetricsAsync()
{
return await this.GetMetricsAsync().ConfigureAwait(false);
}

public abstract Task<RedisPollingTriggerBaseMetrics> GetMetricsAsync();

public ScaleStatus GetScaleStatus(ScaleStatusContext<RedisPollingTriggerBaseMetrics> context)
{
return GetScaleStatusCore(context.WorkerCount, context.Metrics?.ToArray());
}

public ScaleStatus GetScaleStatus(ScaleStatusContext context)
{
return GetScaleStatusCore(context.WorkerCount, context.Metrics?.Cast<RedisPollingTriggerBaseMetrics>().ToArray());
}

private ScaleStatus GetScaleStatusCore(int workerCount, RedisPollingTriggerBaseMetrics[] metrics)
{
// don't scale up or down if we don't have enough metrics
if (metrics is null || metrics.Length < MINIMUM_SAMPLES)
{
return new ScaleStatus { Vote = ScaleVote.None };
}

double average = metrics.OrderByDescending(metric => metric.Timestamp).Take(MINIMUM_SAMPLES).Select(metric => metric.Remaining).Average();

if (workerCount * maxBatchSize < average)
{
return new ScaleStatus { Vote = ScaleVote.ScaleOut };
}

if ((workerCount - 1) * maxBatchSize > average)
{
return new ScaleStatus { Vote = ScaleVote.ScaleIn };
}

return new ScaleStatus { Vote = ScaleVote.None };
}

public async Task<TargetScalerResult> GetScaleResultAsync(TargetScalerContext context)
{
RedisPollingTriggerBaseMetrics metric = await GetMetricsAsync();
return new TargetScalerResult() { TargetWorkerCount = (int)Math.Ceiling(metric.Remaining / (decimal)maxBatchSize) };
}
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
using System;
using Microsoft.Azure.WebJobs.Host.Scale;
using Microsoft.Extensions.DependencyInjection;
using System;

namespace Microsoft.Azure.WebJobs.Extensions.Redis
{
Expand All @@ -21,5 +23,31 @@ public static IWebJobsBuilder AddRedis(this IWebJobsBuilder builder)
builder.AddExtension<RedisExtensionConfigProvider>();
return builder;
}

/// <summary>
/// Adds the <see cref="RedisScalerProvider"/> to the provided <see cref="IWebJobsBuilder"/>.
/// </summary>
/// <param name="builder">The <see cref="IWebJobsBuilder"/> to add the <see cref="RedisScalerProvider"/> to.</param>
/// <param name="triggerMetadata">The metadata for the trigger.</param>
/// <returns></returns>
internal static IWebJobsBuilder AddRedisScaleForTrigger(this IWebJobsBuilder builder, TriggerMetadata triggerMetadata)
{
IServiceProvider serviceProvider = null;
Lazy<RedisScalerProvider> scalerProvider = new Lazy<RedisScalerProvider>(() => new RedisScalerProvider(serviceProvider, triggerMetadata));

builder.Services.AddSingleton<IScaleMonitorProvider>(resolvedServiceProvider =>
{
serviceProvider = serviceProvider ?? resolvedServiceProvider;
return scalerProvider.Value;
});

builder.Services.AddSingleton<ITargetScalerProvider>(resolvedServiceProvider =>
{
serviceProvider = serviceProvider ?? resolvedServiceProvider;
return scalerProvider.Value;
});

return builder;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
using Microsoft.Azure.WebJobs.Description;
using Microsoft.Azure.WebJobs.Host.Scale;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Newtonsoft.Json;
using StackExchange.Redis;
using System;

namespace Microsoft.Azure.WebJobs.Extensions.Redis
{
internal class RedisScalerProvider : IScaleMonitorProvider, ITargetScalerProvider
{
private readonly RedisPollingTriggerBaseScaleMonitor scaleMonitor;

public IScaleMonitor GetMonitor()
{
return scaleMonitor;
}

public ITargetScaler GetTargetScaler()
{
return scaleMonitor;
}

public RedisScalerProvider(IServiceProvider serviceProvider, TriggerMetadata triggerMetadata)
{
IConfiguration configuration = serviceProvider.GetService<IConfiguration>();
INameResolver nameResolver = serviceProvider.GetService<INameResolver>();

RedisPollingTriggerMetadata metadata = JsonConvert.DeserializeObject<RedisPollingTriggerMetadata>(triggerMetadata.Metadata.ToString());
IConnectionMultiplexer multiplexer = RedisExtensionConfigProvider.GetOrCreateConnectionMultiplexer(configuration, metadata.connectionStringSetting);
int maxBatchSize = metadata.maxBatchSize;
string key = RedisUtilities.ResolveString(nameResolver, metadata.key, nameof(metadata.key));

if (string.Equals(triggerMetadata.Type, "redisListTrigger", StringComparison.OrdinalIgnoreCase))
{
scaleMonitor = new RedisListTriggerScaleMonitor(multiplexer, triggerMetadata.FunctionName, maxBatchSize, key);
}
else if (string.Equals(triggerMetadata.Type, "redisStreamTrigger", StringComparison.OrdinalIgnoreCase))
{
scaleMonitor = new RedisStreamTriggerScaleMonitor(multiplexer, triggerMetadata.FunctionName, maxBatchSize, key);
}
else
{
throw new ArgumentException("Trigger is not the RedisStreamTrigger or the RedisListTrigger");
}
}

public class RedisPollingTriggerMetadata
{
[JsonProperty]
public string connectionStringSetting { get; set; }

[JsonProperty]
public string key { get; set; }

[JsonProperty]
public int maxBatchSize { get; set; }
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using Microsoft.Azure.WebJobs.Host.Executors;
using Microsoft.Azure.WebJobs.Host.Scale;
using Microsoft.Extensions.Logging;
using StackExchange.Redis;
using System;
Expand All @@ -23,10 +22,8 @@ public RedisStreamListener(string name, string connectionString, string key, Tim
{
this.consumerGroup = consumerGroup;
this.consumerName = Environment.GetEnvironmentVariable("WEBSITE_INSTANCE_ID") ?? Guid.NewGuid().ToString();

this.logPrefix = $"[Name:{name}][Trigger:RedisStreamTrigger][ConsumerGroup:{consumerGroup}][Key:{key}][Consumer:{consumerName}]";
this.Descriptor = new ScaleMonitorDescriptor(name, $"{name}-RedisStreamTrigger-{consumerGroup}-{key}");
this.TargetScalerDescriptor = new TargetScalerDescriptor($"{name}-RedisStreamTrigger-{consumerGroup}-{key}");
this.scaleMonitor = new RedisStreamTriggerScaleMonitor(multiplexer, name, maxBatchSize, key);
}

public override async void BeforePolling()
Expand Down Expand Up @@ -84,16 +81,5 @@ public async override void BeforeClosing()
long pending = await db.StreamDeleteConsumerAsync(key, consumerGroup, consumerName);
logger?.LogInformation($"{logPrefix} Successfully deleted consumer name '{consumerName}' from the consumer group '{consumerGroup}' for the stream at key '{key}'. There were {pending} pending messages for the consumer.");
}

public override Task<RedisPollingTriggerBaseMetrics> GetMetricsAsync()
{
var metrics = new RedisPollingTriggerBaseMetrics
{
Remaining = multiplexer.GetDatabase().StreamLength(key),
Timestamp = DateTime.UtcNow,
};

return Task.FromResult(metrics);
}
}
}
Loading