Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
183f33a
Merge pull request #1 from Azure/main
seilorjunior Aug 15, 2022
37babd3
Merge pull request #2 from Azure/main
seilorjunior Aug 16, 2022
fa0ea7a
Merge pull request #3 from Azure/main
seilorjunior Aug 18, 2022
8f01720
adding the correct deps
seilorjunior Aug 18, 2022
9acdacc
Merge branch 'main' of https://github.com/seilorjunior/azure-sdk-for-net
seilorjunior Aug 18, 2022
b4a97bd
Merge branch 'Azure:main' into main
seilorjunior Aug 18, 2022
ccf40fd
Merge branch 'Azure:main' into main
seilorjunior Aug 19, 2022
0100ebe
Merge branch 'Azure:main' into main
seilorjunior Sep 22, 2022
2b1885a
Merge branch 'Azure:main' into main
seilorjunior Sep 29, 2022
5e1dfd6
Merge branch 'Azure:main' into main
seilorjunior Oct 10, 2022
5458b37
changes for storagequeue
seilorjunior Oct 17, 2022
25f145e
adding test
seilorjunior Oct 24, 2022
e2c72da
changes from the review
seilorjunior Oct 26, 2022
97b10fa
use _functionId instead of functionId to prevent null err
Oct 27, 2022
5a20852
make metricsprovider and targetscaler classes public, add xml docs
Oct 28, 2022
55ef9f8
separate QueueScaleMonitor, add xml docs
Oct 28, 2022
51f954a
fix unit tests
Oct 31, 2022
3e25d84
cast queuelistener in isharedlistener
Nov 2, 2022
ecd50aa
make targetscaler, scalemonitor, and metricsproviderclasses back to i…
Nov 7, 2022
6c0d4bc
Update sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Common/…
chiangvincent Nov 14, 2022
782883d
Update sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Common/…
chiangvincent Nov 14, 2022
0646743
Update sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/s…
chiangvincent Nov 14, 2022
bc6626e
responding to comments
Nov 14, 2022
d477ffc
Merge branch 'new-storagequeue' of https://github.com/seilorjunior/az…
Nov 14, 2022
4904067
move queuetargetscaler to queues dir
Nov 15, 2022
08124a1
move metricsprovider to queues folder, add unit tests for target scaler
Nov 15, 2022
1e7ff2d
create getqueuelength to use for tbs
Nov 17, 2022
036ff5a
move tests to queueMetricsProviderTests
Nov 17, 2022
ac77dee
make queuemetricsprovider implement livetestbase
Nov 17, 2022
ce43b2a
upgrade webjobs-sdk version, respond to alexey's comments
Nov 18, 2022
947d248
implement itargetscalerprovider in bloblistener
Nov 21, 2022
4676331
add async suffix to getqueuelength, copy over exception handling
Nov 21, 2022
08e78b5
fix merge conflict
Jan 27, 2023
ed80b31
fix bloblistener tests, create mock ctors for targetscaler and scalem…
Jan 30, 2023
ec163a5
remove internal nuget feed reference, update webjobs version to 3.0.3…
Feb 9, 2023
215c50a
update .sources packes to 3.0.36
Feb 13, 2023
799a82a
fix typo
Feb 13, 2023
b028f9d
add queue name for logging
Feb 13, 2023
675b11c
fix merge conflict
Feb 13, 2023
ce845d3
fix broken unit tests
Feb 13, 2023
df01381
accept merge conflict, remove scale monitor descriptor in queue liste…
Feb 14, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

namespace Microsoft.Azure.WebJobs.Extensions.Storage.Blobs.Listeners
{
internal sealed class BlobListener : IListener, IScaleMonitorProvider
internal sealed class BlobListener : IListener, IScaleMonitorProvider, ITargetScalerProvider
{
private readonly ISharedListener _sharedListener;
private readonly ILogger<BlobListener> _logger;
Expand Down Expand Up @@ -113,5 +113,10 @@ public IScaleMonitor GetMonitor()
// so the shared queue won't be monitored.
return ((IScaleMonitorProvider)_sharedListener).GetMonitor();
}

public ITargetScaler GetTargetScaler()
{
return ((ITargetScalerProvider)_sharedListener).GetTargetScaler();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

namespace Microsoft.Azure.WebJobs.Extensions.Storage.Blobs.Listeners
{
internal sealed class SharedBlobQueueListener : ISharedListener, IScaleMonitorProvider
internal sealed class SharedBlobQueueListener : ISharedListener, IScaleMonitorProvider, ITargetScalerProvider
{
private readonly IListener _listener;
private readonly BlobQueueTriggerExecutor _executor;
Expand Down Expand Up @@ -73,7 +73,12 @@ public void Dispose()

public IScaleMonitor GetMonitor()
{
return (IScaleMonitor)_listener;
return ((IScaleMonitorProvider)_listener).GetMonitor();
}

public ITargetScaler GetTargetScaler()
{
return ((ITargetScalerProvider)_listener).GetTargetScaler();
}
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
<Project Sdk="Microsoft.NET.Sdk">
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFrameworks>$(RequiredTargetFrameworks)</TargetFrameworks>
</PropertyGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ public class BlobListenerTests
public void GetMonitor_ReturnsSharedMonitor()
{
var queueListener = new QueueListener();

var watcherMock = new Mock<IBlobWrittenWatcher>(MockBehavior.Strict);
var executor = new BlobQueueTriggerExecutor(BlobTriggerSource.LogsAndContainerScan, watcherMock.Object, NullLogger<BlobListener>.Instance);
var sharedBlobQueueListener = new SharedBlobQueueListener(queueListener, executor);
Expand All @@ -25,7 +26,13 @@ public void GetMonitor_ReturnsSharedMonitor()
var monitor2 = blobListener1.GetMonitor();

Assert.AreSame(monitor1, monitor2);
Assert.AreSame(monitor1, queueListener);
Assert.AreSame(monitor1, queueListener.GetMonitor());

var targetScaler1 = blobListener1.GetTargetScaler();
var targetScaler2 = blobListener1.GetTargetScaler();

Assert.AreSame(targetScaler1, targetScaler2);
Assert.AreSame(targetScaler1, queueListener.GetTargetScaler());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Globalization;
using System.Linq;
using System.Runtime.ExceptionServices;
using System.Threading;
Expand All @@ -25,10 +24,8 @@

namespace Microsoft.Azure.WebJobs.Extensions.Storage.Common.Listeners
{
internal sealed partial class QueueListener : IListener, ITaskSeriesCommand, INotificationCommand, IScaleMonitor<QueueTriggerMetrics>
internal sealed partial class QueueListener : IListener, ITaskSeriesCommand, INotificationCommand, ITargetScalerProvider, IScaleMonitorProvider
{
private const int NumberOfSamplesToConsider = 5;

private readonly ITaskSeriesTimer _timer;
private readonly IDelayStrategy _delayStrategy;
private readonly QueueClient _queue;
Expand All @@ -44,8 +41,9 @@ internal sealed partial class QueueListener : IListener, ITaskSeriesCommand, INo
private readonly ILogger<QueueListener> _logger;
private readonly FunctionDescriptor _functionDescriptor;
private readonly string _functionId;
private readonly ScaleMonitorDescriptor _scaleMonitorDescriptor;
private readonly CancellationTokenSource _shutdownCancellationTokenSource;
private readonly Lazy<QueueTargetScaler> _targetScaler;
private readonly Lazy<QueueScaleMonitor> _scaleMonitor;

private bool? _queueExists;
private bool _foundMessageSinceLastDelay;
Expand All @@ -57,6 +55,8 @@ internal sealed partial class QueueListener : IListener, ITaskSeriesCommand, INo
// for mock testing only
internal QueueListener()
{
_scaleMonitor = new Lazy<QueueScaleMonitor>(() => new QueueScaleMonitor());
_targetScaler = new Lazy<QueueTargetScaler>(() => new QueueTargetScaler());
}

public QueueListener(QueueClient queue,
Expand Down Expand Up @@ -130,10 +130,19 @@ public QueueListener(QueueClient queue,

_delayStrategy = new RandomizedExponentialBackoffStrategy(QueuePollingIntervals.Minimum, maximumInterval);

_scaleMonitorDescriptor = new ScaleMonitorDescriptor($"{_functionId}-QueueTrigger-{_queue.Name}".ToLower(CultureInfo.InvariantCulture), _functionId);
_shutdownCancellationTokenSource = new CancellationTokenSource();

_concurrencyManager = concurrencyManager;

_targetScaler = new Lazy<QueueTargetScaler>(
() => new QueueTargetScaler(
_functionId,
queue,
queueOptions,
loggerFactory
));

_scaleMonitor = new Lazy<QueueScaleMonitor>(() => new QueueScaleMonitor(_functionId, _queue, loggerFactory));
}

// for testing
Expand Down Expand Up @@ -444,185 +453,14 @@ internal static void RegisterSharedWatcherWithQueueProcessor(QueueProcessor queu
}
}

public ScaleMonitorDescriptor Descriptor
{
get
{
return _scaleMonitorDescriptor;
}
}

async Task<ScaleMetrics> IScaleMonitor.GetMetricsAsync()
{
return await GetMetricsAsync().ConfigureAwait(false);
}

public async Task<QueueTriggerMetrics> GetMetricsAsync()
{
int queueLength = 0;
TimeSpan queueTime = TimeSpan.Zero;

try
{
QueueProperties queueProperties = await _queue.GetPropertiesAsync().ConfigureAwait(false);
queueLength = queueProperties.ApproximateMessagesCount;

if (queueLength > 0)
{
PeekedMessage message = (await _queue.PeekMessagesAsync(1).ConfigureAwait(false)).Value.FirstOrDefault();
if (message != null)
{
if (message.InsertedOn.HasValue)
{
queueTime = DateTime.UtcNow.Subtract(message.InsertedOn.Value.DateTime);
}
}
else
{
// ApproximateMessageCount often returns a stale value,
// especially when the queue is empty.
queueLength = 0;
}
}
}
catch (RequestFailedException ex)
{
if (ex.IsNotFoundQueueNotFound() ||
ex.IsConflictQueueBeingDeletedOrDisabled() ||
ex.IsServerSideError())
{
// ignore transient errors, and return default metrics
// E.g. if the queue doesn't exist, we'll return a zero queue length
// and scale in
_logger.LogWarning($"Error querying for queue scale status: {ex.Message}");
}
}

return new QueueTriggerMetrics
{
QueueLength = queueLength,
QueueTime = queueTime,
Timestamp = DateTime.UtcNow
};
}

ScaleStatus IScaleMonitor.GetScaleStatus(ScaleStatusContext context)
{
return GetScaleStatusCore(context.WorkerCount, context.Metrics?.Cast<QueueTriggerMetrics>().ToArray());
}

public ScaleStatus GetScaleStatus(ScaleStatusContext<QueueTriggerMetrics> context)
public ITargetScaler GetTargetScaler()
{
return GetScaleStatusCore(context.WorkerCount, context.Metrics?.ToArray());
return _targetScaler.Value;
}

private ScaleStatus GetScaleStatusCore(int workerCount, QueueTriggerMetrics[] metrics)
public IScaleMonitor GetMonitor()
{
ScaleStatus status = new ScaleStatus
{
Vote = ScaleVote.None
};

// verify we have enough samples to make a scale decision.
if (metrics == null || (metrics.Length < NumberOfSamplesToConsider))
{
return status;
}

// Maintain a minimum ratio of 1 worker per 1,000 queue messages.
long latestQueueLength = metrics.Last().QueueLength;
if (latestQueueLength > workerCount * 1000)
{
status.Vote = ScaleVote.ScaleOut;
_logger.LogInformation($"QueueLength ({latestQueueLength}) > workerCount ({workerCount}) * 1,000");
_logger.LogInformation($"Length of queue ({_queue.Name}, {latestQueueLength}) is too high relative to the number of instances ({workerCount}).");
return status;
}

// Check to see if the queue has been empty for a while.
bool queueIsIdle = metrics.All(p => p.QueueLength == 0);
if (queueIsIdle)
{
status.Vote = ScaleVote.ScaleIn;
_logger.LogInformation($"Queue '{_queue.Name}' is idle");
return status;
}

// Samples are in chronological order. Check for a continuous increase in time or length.
// If detected, this results in an automatic scale out.
if (metrics[0].QueueLength > 0)
{
bool queueLengthIncreasing =
IsTrueForLastN(
metrics,
NumberOfSamplesToConsider,
(prev, next) => prev.QueueLength < next.QueueLength);
if (queueLengthIncreasing)
{
status.Vote = ScaleVote.ScaleOut;
_logger.LogInformation($"Queue length is increasing for '{_queue.Name}'");
return status;
}
}

if (metrics[0].QueueTime > TimeSpan.Zero && metrics[0].QueueTime < metrics[NumberOfSamplesToConsider - 1].QueueTime)
{
bool queueTimeIncreasing =
IsTrueForLastN(
metrics,
NumberOfSamplesToConsider,
(prev, next) => prev.QueueTime <= next.QueueTime);
if (queueTimeIncreasing)
{
status.Vote = ScaleVote.ScaleOut;
_logger.LogInformation($"Queue time is increasing for '{_queue.Name}'");
return status;
}
}

bool queueLengthDecreasing =
IsTrueForLastN(
metrics,
NumberOfSamplesToConsider,
(prev, next) => prev.QueueLength > next.QueueLength);
if (queueLengthDecreasing)
{
status.Vote = ScaleVote.ScaleIn;
_logger.LogInformation($"Queue length is decreasing for '{_queue.Name}'");
return status;
}

bool queueTimeDecreasing = IsTrueForLastN(
metrics,
NumberOfSamplesToConsider,
(prev, next) => prev.QueueTime > next.QueueTime);
if (queueTimeDecreasing)
{
status.Vote = ScaleVote.ScaleIn;
_logger.LogInformation($"Queue time is decreasing for '{_queue.Name}'");
return status;
}

_logger.LogInformation($"Queue '{_queue.Name}' is steady");

return status;
}

private static bool IsTrueForLastN(IList<QueueTriggerMetrics> samples, int count, Func<QueueTriggerMetrics, QueueTriggerMetrics, bool> predicate)
{
Debug.Assert(count > 1, "count must be greater than 1.");
Debug.Assert(count <= samples.Count, "count must be less than or equal to the list size.");

// Walks through the list from left to right starting at len(samples) - count.
for (int i = samples.Count - count; i < samples.Count - 1; i++)
{
if (!predicate(samples[i], samples[i + 1]))
{
return false;
}
}

return true;
return _scaleMonitor.Value;
}
}
}
Loading