Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,5 +68,6 @@ public static partial class EventHubWebJobsBuilderExtensions
{
public static Microsoft.Azure.WebJobs.IWebJobsBuilder AddEventHubs(this Microsoft.Azure.WebJobs.IWebJobsBuilder builder) { throw null; }
public static Microsoft.Azure.WebJobs.IWebJobsBuilder AddEventHubs(this Microsoft.Azure.WebJobs.IWebJobsBuilder builder, System.Action<Microsoft.Azure.WebJobs.EventHubs.EventHubOptions> configure) { throw null; }
public static Microsoft.Azure.WebJobs.IWebJobsBuilder AddEventHubsScaleForTrigger(this Microsoft.Azure.WebJobs.IWebJobsBuilder builder, Microsoft.Azure.WebJobs.Host.Scale.TriggerMetadata triggerMetadata) { throw null; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Globalization;
using System.Net;
using Azure.Messaging.EventHubs.Consumer;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.EventHubs;
using Microsoft.Azure.WebJobs.EventHubs.Listeners;
using Microsoft.Azure.WebJobs.EventHubs.Processor;
using Microsoft.Azure.WebJobs.Host.Scale;
using Microsoft.Extensions.Azure;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
Expand Down Expand Up @@ -91,6 +92,26 @@ public static IWebJobsBuilder AddEventHubs(this IWebJobsBuilder builder, Action<
return builder;
}

public static IWebJobsBuilder AddEventHubsScaleForTrigger(this IWebJobsBuilder builder, TriggerMetadata triggerMetadata)
{
IServiceProvider serviceProvider = null;
Lazy<EventHubsScalerProvider> scalerProvider = new Lazy<EventHubsScalerProvider>(() => new EventHubsScalerProvider(serviceProvider, triggerMetadata));

builder.Services.AddSingleton<IScaleMonitorProvider>(resolvedServiceProvider =>
{
serviceProvider = serviceProvider ?? resolvedServiceProvider;
return scalerProvider.Value;
});

builder.Services.AddSingleton<ITargetScalerProvider>(resolvedServiceProvider =>
{
serviceProvider = serviceProvider ?? resolvedServiceProvider;
return scalerProvider.Value;
});

return builder;
}

internal static void ConfigureOptions(EventHubOptions options)
{
OffsetType? type = options?.InitialOffsetOptions?.Type;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using Azure.Messaging.EventHubs.Primitives;
using Azure.Storage.Blobs;
using Microsoft.Azure.WebJobs.EventHubs.Processor;
using Microsoft.Azure.WebJobs.Extensions.EventHubs.Listeners;
using Microsoft.Azure.WebJobs.Host;
using Microsoft.Azure.WebJobs.Host.Scale;
using Microsoft.Extensions.Azure;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Newtonsoft.Json;

namespace Microsoft.Azure.WebJobs.EventHubs.Listeners
{
internal class EventHubsScalerProvider : IScaleMonitorProvider, ITargetScalerProvider
{
private readonly IScaleMonitor _scaleMonitor;
private readonly ITargetScaler _targetScaler;

public EventHubsScalerProvider(IServiceProvider serviceProvider, TriggerMetadata triggerMetadata)
{
AzureComponentFactory azureComponentFactory;
if ((triggerMetadata.Properties != null) && (triggerMetadata.Properties.TryGetValue(nameof(AzureComponentFactory), out object value)))
{
azureComponentFactory = value as AzureComponentFactory;
}
else
{
azureComponentFactory = serviceProvider.GetService<AzureComponentFactory>();
}

var configuration = serviceProvider.GetService<IConfiguration>();
var hostComponentFactory = serviceProvider.GetService<AzureComponentFactory>();
var logForwarder = serviceProvider.GetService<AzureEventSourceLogForwarder>();
var options = serviceProvider.GetService<IOptions<EventHubOptions>>();
var loggerFactory = serviceProvider.GetService<ILoggerFactory>();
var checkpointClientProvider = new CheckpointClientProvider(configuration, azureComponentFactory, logForwarder, loggerFactory.CreateLogger<BlobServiceClient>());
var nameResolver = serviceProvider.GetService<INameResolver>();
var eventHubMetadata = JsonConvert.DeserializeObject<EventHubMetadata>(triggerMetadata.Metadata.ToString());
var factory = new EventHubClientFactory(configuration, hostComponentFactory, options, nameResolver, logForwarder, checkpointClientProvider);
eventHubMetadata.ResolveProperties(serviceProvider.GetService<INameResolver>());
var eventHubConsumerClient = factory.GetEventHubConsumerClient(eventHubMetadata.EventHubName, eventHubMetadata.Connection, eventHubMetadata.ConsumerGroup);
var checkpointStore = new BlobCheckpointStoreInternal(
Comment thread
alrod marked this conversation as resolved.
factory.GetCheckpointStoreClient(),
triggerMetadata.FunctionName,
loggerFactory.CreateLogger<BlobCheckpointStoreInternal>());
var eventHubMerticsProvider = new EventHubMetricsProvider(
triggerMetadata.FunctionName,
eventHubConsumerClient,
checkpointStore,
loggerFactory.CreateLogger<EventHubMetricsProvider>()
);

_scaleMonitor = new EventHubsScaleMonitor(
triggerMetadata.FunctionName,
eventHubConsumerClient,
checkpointStore,
loggerFactory.CreateLogger<EventHubsScaleMonitor>());

_targetScaler = new EventHubsTargetScaler(
triggerMetadata.FunctionName,
eventHubConsumerClient,
options.Value,
eventHubMerticsProvider,
loggerFactory.CreateLogger<EventHubsScaleMonitor>());
}

public IScaleMonitor GetMonitor()
{
return _scaleMonitor;
}

public ITargetScaler GetTargetScaler()
{
return _targetScaler;
}

private class EventHubMetadata
{
[JsonProperty]
public string EventHubName { get; set; }

[JsonProperty]
public string ConsumerGroup { get; set; }

[JsonProperty]
public string Connection { get; set; }

public void ResolveProperties(INameResolver resolver)
{
if (resolver != null)
{
EventHubName = resolver.ResolveWholeString(EventHubName);
ConsumerGroup = resolver.ResolveWholeString(ConsumerGroup);
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ internal TargetScalerResult GetScaleResultInternal(TargetScalerContext context,
_logger.LogInformation($"Desired target worker count of '{desiredWorkerCount}' is not in list of valid sorted workers: '{string.Join(",", sortedValidWorkerCounts)}'. Using next largest valid worker as target worker count.");
}

_logger.LogInformation($"'Target worker count for function '{_functionId}' is '{validatedTargetWorkerCount}' (EventHubName='{_client.EventHubName}', EventCount ='{eventCount}', Concurrency='{desiredConcurrency}', PartitionCount='{partitionCount}').");
_logger.LogInformation($"Target worker count for function '{_functionId}' is '{validatedTargetWorkerCount}' (EventHubName='{_client.EventHubName}', EventCount ='{eventCount}', Concurrency='{desiredConcurrency}', PartitionCount='{partitionCount}').");

return new TargetScalerResult
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
<PackageReference Include="Moq" />
<PackageReference Include="NUnit" />
<PackageReference Include="NUnit3TestAdapter" />
<PackageReference Include="Microsoft.Azure.WebJobs.Host.Storage" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Azure.Core.TestFramework;
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Primitives;
using Azure.Messaging.EventHubs.Producer;
using Azure.Messaging.EventHubs.Tests;
using Azure.Storage.Blobs;
using Microsoft.Azure.WebJobs.EventHubs;
using Microsoft.Azure.WebJobs.Host.EndToEndTests;
using Microsoft.Azure.WebJobs.Host.Scale;
using Microsoft.Azure.WebJobs.Host.TestCommon;
using Microsoft.Extensions.Azure;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json.Linq;
using NUnit.Framework;

namespace Microsoft.Azure.WebJobs.Extensions.ServiceBus.Tests
{
[NonParallelizable]
[LiveOnly(true)]
public class ScaleHostEndToEndTests : WebJobsEventHubTestBase
{
private const string Function1Name = "Function1";
private const string Function2Name = "Function2";

private const string EventHubConnection1 = "EventHubConnection1";
private const string EventHubConnection2 = "EventHubConnection2";

/// <summary>
/// Performs the tasks needed to initialize each test. This
/// method runs once for the each test prior to running it.
/// </summary>
///
[SetUp]
public new async Task BaseSetUp()
{
_eventHubScope = await EventHubScope.CreateAsync(2, new List<string>() { "ConsumerGroup" });
}

public ScaleHostEndToEndTests() : base()
{
}

[Test]
[TestCase(false)]
[TestCase(true)]
public async Task ScaleHostEndToEndTest(bool tbsEnabled)
{
string hostJson =
@"{
""azureWebJobs"" : {
""extensions"": {
""eventHubs"": {
""targetUnprocessedEventThreshold"": 1
}
}
}
}";

// Function1Name uses connection string
// Function2Name uses AzureComponentFactory - simulating managed identity scenario in ScaleController
string triggers = $@"{{
""triggers"": [
{{
""name"": ""myQueueItem"",
""type"": ""eventHubsTrigger"",
""direction"": ""in"",
""eventHubName"": ""{_eventHubScope.EventHubName}"",
""consumerGroup"": ""{_eventHubScope.ConsumerGroups[0]}"",
""connection"": ""{EventHubConnection1}"",
""functionName"": ""{Function1Name}""
}},
{{
""name"": ""myQueueItem"",
""type"": ""serviceBusTrigger"",
""direction"": ""in"",
""eventHubName"": ""{_eventHubScope.EventHubName}"",
""consumerGroup"": ""{_eventHubScope.ConsumerGroups[1]}"",
""connection"": ""{EventHubConnection2}"",
""functionName"": ""{Function2Name}""
}}
]}}";

IHost host = new HostBuilder().ConfigureServices(services => services.AddAzureClientsCore()).Build();
AzureComponentFactory defaultAzureComponentFactory = host.Services.GetService<AzureComponentFactory>();
TestComponentFactory factoryWrapper = new TestComponentFactory(defaultAzureComponentFactory, EventHubsTestEnvironment.Instance.Credential);

string hostId = "test-host";
var loggerProvider = new TestLoggerProvider();

IHostBuilder hostBuilder = new HostBuilder();
hostBuilder.ConfigureLogging(configure =>
{
configure.SetMinimumLevel(LogLevel.Debug);
configure.AddProvider(loggerProvider);
});
hostBuilder.ConfigureAppConfiguration((hostBuilderContext, config) =>
{
// Adding host.json here
config.AddJsonStream(new MemoryStream(Encoding.UTF8.GetBytes(hostJson)));

var settings = new Dictionary<string, string>()
{
{ $"{EventHubConnection1}", EventHubsTestEnvironment.Instance.EventHubsConnectionString },
{ $"{EventHubConnection2}:fullyQualifiedNamespace", $"{EventHubsTestEnvironment.Instance.EventHubsNamespace}.servicebus.windows.net" },
{ "AzureWebJobsStorage", StorageTestEnvironment.Instance.StorageConnectionString }
};

// Adding app setting
config.AddInMemoryCollection(settings);
})
.ConfigureServices(services =>
{
services.AddAzureStorageScaleServices();

FakeNameResolver nameResolver = new FakeNameResolver();
nameResolver.Add(EventHubConnection1, EventHubsTestEnvironment.Instance.EventHubsConnectionString);
nameResolver.Add(EventHubConnection2, $"{EventHubsTestEnvironment.Instance.EventHubsNamespace}.servicebus.windows.net");
services.AddSingleton<INameResolver>(nameResolver);
})
.ConfigureWebJobsScale((context, builder) =>
{
builder.AddEventHubs();
builder.UseHostId(hostId);

foreach (var jtoken in JObject.Parse(triggers)["triggers"])
{
TriggerMetadata metadata = new TriggerMetadata(jtoken as JObject);
if (metadata.FunctionName == Function2Name)
{
metadata.Properties[nameof(AzureComponentFactory)] = factoryWrapper;
}
builder.AddEventHubsScaleForTrigger(metadata);
}
},
scaleOptions =>
{
scaleOptions.IsTargetScalingEnabled = tbsEnabled;
scaleOptions.MetricsPurgeEnabled = false;
scaleOptions.ScaleMetricsMaxAge = TimeSpan.FromMinutes(4);
scaleOptions.IsRuntimeScalingEnabled = true;
scaleOptions.ScaleMetricsSampleInterval = TimeSpan.FromSeconds(1);
});

IHost scaleHost = hostBuilder.Build();
await scaleHost.StartAsync();

await using var producer = new EventHubProducerClient(EventHubsTestEnvironment.Instance.EventHubsConnectionString, _eventHubScope.EventHubName);
await producer.SendAsync(new EventData[] { new EventData(new BinaryData("data")) });
await producer.SendAsync(new EventData[] { new EventData(new BinaryData("data")) });

var opstions = scaleHost.GetOptions<EventHubOptions>();
var blobContainerClient = new BlobContainerClient(StorageTestEnvironment.Instance.StorageConnectionString, opstions.CheckpointContainer);
var blobCheckpointStoreInternal1 = new BlobCheckpointStoreInternal(blobContainerClient, Function1Name, loggerProvider.CreateLogger("Test"));
await blobCheckpointStoreInternal1.CreateIfNotExistsAsync(CancellationToken.None);
var blobCheckpointStoreInternal2 = new BlobCheckpointStoreInternal(blobContainerClient, Function2Name, loggerProvider.CreateLogger("Test"));
await blobCheckpointStoreInternal2.CreateIfNotExistsAsync(CancellationToken.None);

await TestHelpers.Await(async () =>
{
IScaleStatusProvider scaleStatusProvider = scaleHost.Services.GetService<IScaleStatusProvider>();

var scaleStatus = await scaleStatusProvider.GetScaleStatusAsync(new ScaleStatusContext());

bool scaledOut = false;
if (!tbsEnabled)
{
scaledOut = scaleStatus.Vote == ScaleVote.ScaleOut && scaleStatus.TargetWorkerCount == null
&& scaleStatus.FunctionScaleStatuses[Function1Name].Vote == ScaleVote.ScaleOut
&& scaleStatus.FunctionScaleStatuses[Function2Name].Vote == ScaleVote.ScaleOut;

if (scaledOut)
{
var logMessages = loggerProvider.GetAllLogMessages().Select(p => p.FormattedMessage).ToArray();
Assert.Contains("2 scale monitors to sample", logMessages);
}
}
else
{
scaledOut = scaleStatus.Vote == ScaleVote.ScaleOut && scaleStatus.TargetWorkerCount == 2
&& scaleStatus.FunctionTargetScalerResults[Function1Name].TargetWorkerCount == 2
&& scaleStatus.FunctionTargetScalerResults[Function2Name].TargetWorkerCount == 2;

if (scaledOut)
{
var logMessages = loggerProvider.GetAllLogMessages().Select(p => p.FormattedMessage).ToArray();
Assert.Contains("2 target scalers to sample", logMessages);
}
}

if (scaledOut)
{
var logMessages = loggerProvider.GetAllLogMessages().Select(p => p.FormattedMessage).ToArray();
Assert.IsNotEmpty(logMessages.Where(x => x.StartsWith("Runtime scale monitoring is enabled.")));
if (!tbsEnabled)
{
Assert.Contains("Scaling out based on votes", logMessages);
}
}

return scaledOut;
}, pollingInterval: 2000, timeout: 180000, throwWhenDebugging: true);
}
}
}