diff --git a/Directory.Packages.props b/Directory.Packages.props index 1d2ec0a3..9064c766 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -4,8 +4,8 @@ + - diff --git a/docs/BindingsOverview.md b/docs/BindingsOverview.md index 90884aea..095e7673 100644 --- a/docs/BindingsOverview.md +++ b/docs/BindingsOverview.md @@ -173,9 +173,16 @@ This controls the upper limit on the number of pending changes in the user table ### Scaling for Trigger Bindings -If your application containing functions with SQL trigger bindings is running as an Azure function app, it will be scaled automatically based on the amount of changes that are pending to be processed in the user table. As of today, we only support scaling of function apps running in Elastic Premium plan. To enable scaling, you will need to go the function app resource's page on Azure Portal, then to Configuration > 'Function runtime settings' and turn on 'Runtime Scale Monitoring'. For more information, check documentation on [Runtime Scaling](https://learn.microsoft.com/azure/azure-functions/event-driven-scaling#runtime-scaling). You can configure scaling parameters by going to 'Scale out (App Service plan)' setting on the function app's page. To understand various scale settings, please check the respective sections in [Azure Functions Premium plan](https://learn.microsoft.com/azure/azure-functions/functions-premium-plan?tabs=portal#eliminate-cold-starts)'s documentation. +If your application containing functions with SQL trigger bindings is running as an Azure function app, it will be scaled automatically based on the amount of changes that are pending to be processed in the user table. As of today, we only support scaling of function apps running in Elastic Premium plan with 'Runtime Scale Monitoring' enabled. To enable scaling, you will need to go the function app resource's page on Azure Portal, then to Configuration > 'Function runtime settings' and turn on 'Runtime Scale Monitoring'. + +There are two types of scaling available: + +- Incremental scaling - This scales the application serially, increasing or decreasing the workers by 1. There are a couple of checks made to decide on whether the host application needs to be scaled in or out. The rationale behind these checks is to ensure that the count of pending changes per application-worker stays below a certain maximum limit, controlled by [Sql_Trigger_MaxChangesPerWorker](#sql_trigger_maxchangesperworker), while also ensuring that the number of workers running stays minimal. The scaling decision is made based on the latest count of the pending changes and whether the last 5 samples we took were continually increasing or decreasing. + +- Target Based Scaling - This type of scaling depends on the pending change count and the value of [dynamic concurrency](https://learn.microsoft.com/azure/azure-functions/functions-concurrency#dynamic-concurrency) which if not enabled is defaulted to [Sql_Trigger_MaxChangesPerWorker](#sql_trigger_maxchangesperworker). The target worker count is decided by dividing the pending changes by the concurrency value. The application scales out to the number of instances specified by the target worker count. + +For more information, check documentation on [Runtime Scaling](https://learn.microsoft.com/azure/azure-functions/event-driven-scaling#runtime-scaling). You can configure scaling parameters by going to 'Scale out (App Service plan)' setting on the function app's page. To understand various scale settings, please check the respective sections in [Azure Functions Premium plan](https://learn.microsoft.com/azure/azure-functions/functions-premium-plan?tabs=portal#eliminate-cold-starts)'s documentation. -There are a couple of checks made to decide on whether the host application needs to be scaled in or out. The rationale behind these checks is to ensure that the count of pending changes per application-worker stays below a certain maximum limit, which is defaulted to 1000, while also ensuring that the number of workers running stays minimal. The scaling decision is made based on the latest count of the pending changes and whether the last 5 times we checked the count, we found it to be continuously increasing or decreasing. ### Retry support for Trigger Bindings diff --git a/nuget.config b/nuget.config index 99bfee80..465dbf34 100644 --- a/nuget.config +++ b/nuget.config @@ -19,4 +19,4 @@ - \ No newline at end of file + diff --git a/performance/packages.lock.json b/performance/packages.lock.json index cfef194b..b2526262 100644 --- a/performance/packages.lock.json +++ b/performance/packages.lock.json @@ -277,11 +277,12 @@ }, "Microsoft.Azure.WebJobs.Core": { "type": "Transitive", - "resolved": "3.0.33", - "contentHash": "4Rp5of6KFEGisQL7OJV2/8v3IQGFvTgZ9xSbB3bfkw5VHoNKn3Yllx7Qk/oUUX7Zey7jNu5mQFIR6dW6xNMX5Q==", + "resolved": "3.0.36", + "contentHash": "4ht/u677qxDL5rviXafYI5K9qtxee8peXQd8JDqG5KBRr5VXct2pgpEs5SnNu+gDO8dAVp+83N1IRiGG2Fl7Nw==", "dependencies": { "System.ComponentModel.Annotations": "4.4.0", - "System.Diagnostics.TraceSource": "4.3.0" + "System.Diagnostics.TraceSource": "4.3.0", + "System.Memory.Data": "1.0.1" } }, "Microsoft.Azure.WebJobs.Extensions": { @@ -1860,7 +1861,7 @@ "dependencies": { "Microsoft.ApplicationInsights": "[2.21.0, )", "Microsoft.AspNetCore.Http": "[2.2.2, )", - "Microsoft.Azure.WebJobs": "[3.0.33, )", + "Microsoft.Azure.WebJobs": "[3.0.36, )", "Microsoft.Data.SqlClient": "[5.0.1, )", "Newtonsoft.Json": "[13.0.2, )", "System.Runtime.Caching": "[5.0.0, )", @@ -1950,11 +1951,11 @@ }, "Microsoft.Azure.WebJobs": { "type": "CentralTransitive", - "requested": "[3.0.33, )", - "resolved": "3.0.33", - "contentHash": "eSv858ll+GsLYxM2+RKBiTC34CvGnEgLtnrzRljzrociIXsosadHDQLxvvqu0eyIQRRf5kc4MHII/wc0HRNvyg==", + "requested": "[3.0.36, )", + "resolved": "3.0.36", + "contentHash": "S5wppmL8sUfanGmo/zDzpoWrcM+IOvGNFjpkD5XJFCFzrBqktZTEX6MpP/vF3RN6VZm2sFKegZYyce+RNhgE4Q==", "dependencies": { - "Microsoft.Azure.WebJobs.Core": "3.0.33", + "Microsoft.Azure.WebJobs.Core": "3.0.36", "Microsoft.Extensions.Configuration": "2.1.1", "Microsoft.Extensions.Configuration.Abstractions": "2.1.1", "Microsoft.Extensions.Configuration.EnvironmentVariables": "2.1.0", @@ -1963,8 +1964,8 @@ "Microsoft.Extensions.Logging": "2.1.1", "Microsoft.Extensions.Logging.Abstractions": "2.1.1", "Microsoft.Extensions.Logging.Configuration": "2.1.0", - "Newtonsoft.Json": "11.0.2", - "System.Memory.Data": "1.0.1", + "Newtonsoft.Json": "13.0.1", + "System.Memory.Data": "1.0.2", "System.Threading.Tasks.Dataflow": "4.8.0" } }, diff --git a/samples/samples-csharp/packages.lock.json b/samples/samples-csharp/packages.lock.json index a4f5a09f..dea9ef80 100644 --- a/samples/samples-csharp/packages.lock.json +++ b/samples/samples-csharp/packages.lock.json @@ -277,11 +277,12 @@ }, "Microsoft.Azure.WebJobs.Core": { "type": "Transitive", - "resolved": "3.0.33", - "contentHash": "4Rp5of6KFEGisQL7OJV2/8v3IQGFvTgZ9xSbB3bfkw5VHoNKn3Yllx7Qk/oUUX7Zey7jNu5mQFIR6dW6xNMX5Q==", + "resolved": "3.0.36", + "contentHash": "4ht/u677qxDL5rviXafYI5K9qtxee8peXQd8JDqG5KBRr5VXct2pgpEs5SnNu+gDO8dAVp+83N1IRiGG2Fl7Nw==", "dependencies": { "System.ComponentModel.Annotations": "4.4.0", - "System.Diagnostics.TraceSource": "4.3.0" + "System.Diagnostics.TraceSource": "4.3.0", + "System.Memory.Data": "1.0.1" } }, "Microsoft.Azure.WebJobs.Extensions": { @@ -1720,7 +1721,7 @@ "dependencies": { "Microsoft.ApplicationInsights": "[2.21.0, )", "Microsoft.AspNetCore.Http": "[2.2.2, )", - "Microsoft.Azure.WebJobs": "[3.0.33, )", + "Microsoft.Azure.WebJobs": "[3.0.36, )", "Microsoft.Data.SqlClient": "[5.0.1, )", "Newtonsoft.Json": "[13.0.2, )", "System.Runtime.Caching": "[5.0.0, )", @@ -1761,11 +1762,11 @@ }, "Microsoft.Azure.WebJobs": { "type": "CentralTransitive", - "requested": "[3.0.33, )", - "resolved": "3.0.33", - "contentHash": "eSv858ll+GsLYxM2+RKBiTC34CvGnEgLtnrzRljzrociIXsosadHDQLxvvqu0eyIQRRf5kc4MHII/wc0HRNvyg==", + "requested": "[3.0.36, )", + "resolved": "3.0.36", + "contentHash": "S5wppmL8sUfanGmo/zDzpoWrcM+IOvGNFjpkD5XJFCFzrBqktZTEX6MpP/vF3RN6VZm2sFKegZYyce+RNhgE4Q==", "dependencies": { - "Microsoft.Azure.WebJobs.Core": "3.0.33", + "Microsoft.Azure.WebJobs.Core": "3.0.36", "Microsoft.Extensions.Configuration": "2.1.1", "Microsoft.Extensions.Configuration.Abstractions": "2.1.1", "Microsoft.Extensions.Configuration.EnvironmentVariables": "2.1.0", @@ -1774,8 +1775,8 @@ "Microsoft.Extensions.Logging": "2.1.1", "Microsoft.Extensions.Logging.Abstractions": "2.1.1", "Microsoft.Extensions.Logging.Configuration": "2.1.0", - "Newtonsoft.Json": "11.0.2", - "System.Memory.Data": "1.0.1", + "Newtonsoft.Json": "13.0.1", + "System.Memory.Data": "1.0.2", "System.Threading.Tasks.Dataflow": "4.8.0" } }, diff --git a/samples/samples-csharpscript/packages.lock.json b/samples/samples-csharpscript/packages.lock.json index 2b848581..ab287abb 100644 --- a/samples/samples-csharpscript/packages.lock.json +++ b/samples/samples-csharpscript/packages.lock.json @@ -130,11 +130,12 @@ }, "Microsoft.Azure.WebJobs.Core": { "type": "Transitive", - "resolved": "3.0.33", - "contentHash": "4Rp5of6KFEGisQL7OJV2/8v3IQGFvTgZ9xSbB3bfkw5VHoNKn3Yllx7Qk/oUUX7Zey7jNu5mQFIR6dW6xNMX5Q==", + "resolved": "3.0.36", + "contentHash": "4ht/u677qxDL5rviXafYI5K9qtxee8peXQd8JDqG5KBRr5VXct2pgpEs5SnNu+gDO8dAVp+83N1IRiGG2Fl7Nw==", "dependencies": { "System.ComponentModel.Annotations": "4.4.0", - "System.Diagnostics.TraceSource": "4.3.0" + "System.Diagnostics.TraceSource": "4.3.0", + "System.Memory.Data": "1.0.1" } }, "Microsoft.Azure.WebJobs.Extensions.Storage.Blobs": { @@ -1574,7 +1575,7 @@ "dependencies": { "Microsoft.ApplicationInsights": "[2.21.0, )", "Microsoft.AspNetCore.Http": "[2.2.2, )", - "Microsoft.Azure.WebJobs": "[3.0.33, )", + "Microsoft.Azure.WebJobs": "[3.0.36, )", "Microsoft.Data.SqlClient": "[5.0.1, )", "Newtonsoft.Json": "[13.0.2, )", "System.Runtime.Caching": "[5.0.0, )", @@ -1592,11 +1593,11 @@ }, "Microsoft.Azure.WebJobs": { "type": "CentralTransitive", - "requested": "[3.0.33, )", - "resolved": "3.0.33", - "contentHash": "eSv858ll+GsLYxM2+RKBiTC34CvGnEgLtnrzRljzrociIXsosadHDQLxvvqu0eyIQRRf5kc4MHII/wc0HRNvyg==", + "requested": "[3.0.36, )", + "resolved": "3.0.36", + "contentHash": "S5wppmL8sUfanGmo/zDzpoWrcM+IOvGNFjpkD5XJFCFzrBqktZTEX6MpP/vF3RN6VZm2sFKegZYyce+RNhgE4Q==", "dependencies": { - "Microsoft.Azure.WebJobs.Core": "3.0.33", + "Microsoft.Azure.WebJobs.Core": "3.0.36", "Microsoft.Extensions.Configuration": "2.1.1", "Microsoft.Extensions.Configuration.Abstractions": "2.1.1", "Microsoft.Extensions.Configuration.EnvironmentVariables": "2.1.0", @@ -1605,8 +1606,8 @@ "Microsoft.Extensions.Logging": "2.1.1", "Microsoft.Extensions.Logging.Abstractions": "2.1.1", "Microsoft.Extensions.Logging.Configuration": "2.1.0", - "Newtonsoft.Json": "11.0.2", - "System.Memory.Data": "1.0.1", + "Newtonsoft.Json": "13.0.1", + "System.Memory.Data": "1.0.2", "System.Threading.Tasks.Dataflow": "4.8.0" } }, diff --git a/samples/samples-outofproc/packages.lock.json b/samples/samples-outofproc/packages.lock.json index 7698d588..15d9eb98 100644 --- a/samples/samples-outofproc/packages.lock.json +++ b/samples/samples-outofproc/packages.lock.json @@ -846,9 +846,9 @@ "microsoft.azure.functions.worker.extensions.sql": { "type": "Project", "dependencies": { - "Microsoft.Azure.Functions.Worker.Extensions.Abstractions": "[1.1.0, )", - "Microsoft.Data.SqlClient": "[5.0.1, )", - "System.Drawing.Common": "[5.0.3, )" + "Microsoft.Azure.Functions.Worker.Extensions.Abstractions": "1.1.0", + "Microsoft.Data.SqlClient": "5.0.1", + "System.Drawing.Common": "5.0.3" } }, "Microsoft.Azure.Functions.Worker.Extensions.Abstractions": { diff --git a/src/TriggerBinding/SqlTableChangeMonitor.cs b/src/TriggerBinding/SqlTableChangeMonitor.cs index 4e4cc314..35769388 100644 --- a/src/TriggerBinding/SqlTableChangeMonitor.cs +++ b/src/TriggerBinding/SqlTableChangeMonitor.cs @@ -180,69 +180,6 @@ public void Dispose() this._cancellationTokenSourceCheckForChanges.Cancel(); } - public async Task GetUnprocessedChangeCountAsync() - { - long unprocessedChangeCount = 0L; - - try - { - long getUnprocessedChangesDurationMs = 0L; - - using (var connection = new SqlConnection(this._connectionString)) - { - this._logger.LogDebugWithThreadId("BEGIN OpenGetUnprocessedChangesConnection"); - await connection.OpenAsync(); - this._logger.LogDebugWithThreadId("END OpenGetUnprocessedChangesConnection"); - - // Use a transaction to automatically release the app lock when we're done executing the query - using (SqlTransaction transaction = connection.BeginTransaction(IsolationLevel.RepeatableRead)) - { - try - { - using (SqlCommand getUnprocessedChangesCommand = this.BuildGetUnprocessedChangesCommand(connection, transaction)) - { - this._logger.LogInformation("Getting change count"); - this._logger.LogDebugWithThreadId($"BEGIN GetUnprocessedChangeCount Query={getUnprocessedChangesCommand.CommandText}"); - var commandSw = Stopwatch.StartNew(); - unprocessedChangeCount = (long)await getUnprocessedChangesCommand.ExecuteScalarAsync(); - getUnprocessedChangesDurationMs = commandSw.ElapsedMilliseconds; - } - - this._logger.LogDebugWithThreadId($"END GetUnprocessedChangeCount Duration={getUnprocessedChangesDurationMs}ms Count={unprocessedChangeCount}"); - transaction.Commit(); - } - catch (Exception) - { - try - { - transaction.Rollback(); - } - catch (Exception ex2) - { - this._logger.LogError($"GetUnprocessedChangeCount : Failed to rollback transaction due to exception: {ex2.GetType()}. Exception message: {ex2.Message}"); - TelemetryInstance.TrackException(TelemetryErrorName.GetUnprocessedChangeCountRollback, ex2, this._telemetryProps); - } - throw; - } - } - } - - var measures = new Dictionary - { - [TelemetryMeasureName.GetUnprocessedChangesDurationMs] = getUnprocessedChangesDurationMs, - [TelemetryMeasureName.UnprocessedChangeCount] = unprocessedChangeCount, - }; - } - catch (Exception ex) - { - this._logger.LogError($"Failed to query count of unprocessed changes for table '{this._userTable.FullName}' due to exception: {ex.GetType()}. Exception message: {ex.Message}"); - TelemetryInstance.TrackException(TelemetryErrorName.GetUnprocessedChangeCount, ex, this._telemetryProps); - throw; - } - - return unprocessedChangeCount; - } - /// /// Executed once every period. Each iteration will go through a series of state /// changes, if an error occurs in any of them then it will skip the rest of the states and try again in the next @@ -904,38 +841,6 @@ LEFT OUTER JOIN {this._userTable.BracketQuotedFullName} AS u ON {userTableJoinCo return new SqlCommand(getChangesQuery, connection, transaction); } - /// - /// Builds the query to get count of unprocessed changes in the user's table. This one mimics the query that is - /// used by workers to get the changes for processing. - /// - /// The connection to add to the returned SqlCommand - /// The transaction to add to the returned SqlCommand - /// The SqlCommand populated with the query and appropriate parameters - private SqlCommand BuildGetUnprocessedChangesCommand(SqlConnection connection, SqlTransaction transaction) - { - string leasesTableJoinCondition = string.Join(" AND ", this._primaryKeyColumns.Select(col => $"c.{col.name.AsBracketQuotedString()} = l.{col.name.AsBracketQuotedString()}")); - - string getUnprocessedChangesQuery = $@" - {AppLockStatements} - - DECLARE @last_sync_version bigint; - SELECT @last_sync_version = LastSyncVersion - FROM {GlobalStateTableName} - WHERE UserFunctionID = '{this._userFunctionId}' AND UserTableID = {this._userTableId}; - - SELECT COUNT_BIG(*) - FROM CHANGETABLE(CHANGES {this._userTable.BracketQuotedFullName}, @last_sync_version) AS c - LEFT OUTER JOIN {this._leasesTableName} AS l ON {leasesTableJoinCondition} - WHERE - (l.{LeasesTableLeaseExpirationTimeColumnName} IS NULL AND - (l.{LeasesTableChangeVersionColumnName} IS NULL OR l.{LeasesTableChangeVersionColumnName} < c.{SysChangeVersionColumnName}) OR - l.{LeasesTableLeaseExpirationTimeColumnName} < SYSDATETIME()) AND - (l.{LeasesTableAttemptCountColumnName} IS NULL OR l.{LeasesTableAttemptCountColumnName} < {MaxChangeProcessAttemptCount}); - "; - - return new SqlCommand(getUnprocessedChangesQuery, connection, transaction); - } - /// /// Builds the query to acquire leases on the rows in "_rows" if changes are detected in the user's table /// (). diff --git a/src/TriggerBinding/SqlTriggerListener.cs b/src/TriggerBinding/SqlTriggerListener.cs index 59c9f6a4..50b28ac4 100644 --- a/src/TriggerBinding/SqlTriggerListener.cs +++ b/src/TriggerBinding/SqlTriggerListener.cs @@ -12,13 +12,13 @@ using static Microsoft.Azure.WebJobs.Extensions.Sql.Telemetry.Telemetry; using static Microsoft.Azure.WebJobs.Extensions.Sql.SqlTriggerConstants; using static Microsoft.Azure.WebJobs.Extensions.Sql.SqlBindingUtilities; +using static Microsoft.Azure.WebJobs.Extensions.Sql.SqlTriggerUtils; using Microsoft.Azure.WebJobs.Host.Executors; using Microsoft.Azure.WebJobs.Host.Listeners; using Microsoft.Azure.WebJobs.Host.Scale; using Microsoft.Data.SqlClient; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Configuration; -using MoreLinq; namespace Microsoft.Azure.WebJobs.Extensions.Sql { @@ -26,7 +26,7 @@ namespace Microsoft.Azure.WebJobs.Extensions.Sql /// Represents the listener to SQL table changes. /// /// POCO class representing the row in the user table - internal sealed class SqlTriggerListener : IListener, IScaleMonitor + internal sealed class SqlTriggerListener : IListener, IScaleMonitorProvider, ITargetScalerProvider { private const int ListenerNotStarted = 0; private const int ListenerStarting = 1; @@ -44,16 +44,17 @@ internal sealed class SqlTriggerListener : IListener, IScaleMonitor _telemetryProps = new Dictionary(); private readonly int _maxChangesPerWorker; private readonly bool _hasConfiguredMaxChangesPerWorker = false; private SqlTableChangeMonitor _changeMonitor; + private readonly IScaleMonitor _scaleMonitor; + private readonly ITargetScaler _targetScaler; + private int _listenerState = ListenerNotStarted; - ScaleMonitorDescriptor IScaleMonitor.Descriptor => this._scaleMonitorDescriptor; /// /// Initializes a new instance of the class. @@ -73,9 +74,6 @@ public SqlTriggerListener(string connectionString, string tableName, string user this._logger = logger ?? throw new ArgumentNullException(nameof(logger)); this._configuration = configuration ?? throw new ArgumentNullException(nameof(configuration)); int? configuredMaxChangesPerWorker; - // Do not convert the scale-monitor ID to lower-case string since SQL table names can be case-sensitive - // depending on the collation of the current database. - this._scaleMonitorDescriptor = new ScaleMonitorDescriptor($"{userFunctionId}-SqlTrigger-{tableName}"); configuredMaxChangesPerWorker = configuration.GetValue(ConfigKey_SqlTrigger_MaxChangesPerWorker); this._maxChangesPerWorker = configuredMaxChangesPerWorker ?? DefaultMaxChangesPerWorker; if (this._maxChangesPerWorker <= 0) @@ -83,6 +81,9 @@ public SqlTriggerListener(string connectionString, string tableName, string user throw new InvalidOperationException($"Invalid value for configuration setting '{ConfigKey_SqlTrigger_MaxChangesPerWorker}'. Ensure that the value is a positive integer."); } this._hasConfiguredMaxChangesPerWorker = configuredMaxChangesPerWorker != null; + + this._scaleMonitor = new SqlTriggerScaleMonitor(this._userFunctionId, this._userTable.FullName, this._connectionString, this._maxChangesPerWorker, this._logger); + this._targetScaler = new SqlTriggerTargetScaler(this._userFunctionId, this._userTable.FullName, this._connectionString, this._maxChangesPerWorker, this._logger); } public void Cancel() @@ -120,8 +121,8 @@ public async Task StartAsync(CancellationToken cancellationToken) await VerifyDatabaseSupported(connection, this._logger, cancellationToken); - int userTableId = await this.GetUserTableIdAsync(connection, cancellationToken); - IReadOnlyList<(string name, string type)> primaryKeyColumns = await this.GetPrimaryKeyColumnsAsync(connection, userTableId, cancellationToken); + int userTableId = await GetUserTableIdAsync(connection, this._userTable.FullName, this._logger, cancellationToken); + IReadOnlyList<(string name, string type)> primaryKeyColumns = await GetPrimaryKeyColumnsAsync(connection, userTableId, this._logger, this._userTable.FullName, cancellationToken); IReadOnlyList userTableColumns = await this.GetUserTableColumnsAsync(connection, userTableId, cancellationToken); string leasesTableName = string.Format(CultureInfo.InvariantCulture, LeasesTableNameFormat, $"{this._userFunctionId}_{userTableId}"); @@ -207,97 +208,6 @@ public Task StopAsync(CancellationToken cancellationToken) return Task.CompletedTask; } - /// - /// Returns the object ID of the user table. - /// - /// Thrown in case of error when querying the object ID for the user table - private async Task GetUserTableIdAsync(SqlConnection connection, CancellationToken cancellationToken) - { - string getObjectIdQuery = $"SELECT OBJECT_ID(N{this._userTable.QuotedFullName}, 'U');"; - - this._logger.LogDebugWithThreadId($"BEGIN GetUserTableId Query={getObjectIdQuery}"); - using (var getObjectIdCommand = new SqlCommand(getObjectIdQuery, connection)) - using (SqlDataReader reader = await getObjectIdCommand.ExecuteReaderAsync(cancellationToken)) - { - if (!await reader.ReadAsync(cancellationToken)) - { - throw new InvalidOperationException($"Received empty response when querying the object ID for table: '{this._userTable.FullName}'."); - } - - object userTableId = reader.GetValue(0); - - if (userTableId is DBNull) - { - throw new InvalidOperationException($"Could not find table: '{this._userTable.FullName}'."); - } - this._logger.LogDebugWithThreadId($"END GetUserTableId TableId={userTableId}"); - return (int)userTableId; - } - } - - /// - /// Gets the names and types of primary key columns of the user table. - /// - /// - /// Thrown if there are no primary key columns present in the user table or if their names conflict with columns in leases table. - /// - private async Task> GetPrimaryKeyColumnsAsync(SqlConnection connection, int userTableId, CancellationToken cancellationToken) - { - const int NameIndex = 0, TypeIndex = 1, LengthIndex = 2, PrecisionIndex = 3, ScaleIndex = 4; - string getPrimaryKeyColumnsQuery = $@" - SELECT - c.name, - t.name, - c.max_length, - c.precision, - c.scale - FROM sys.indexes AS i - INNER JOIN sys.index_columns AS ic ON i.object_id = ic.object_id AND i.index_id = ic.index_id - INNER JOIN sys.columns AS c ON ic.object_id = c.object_id AND ic.column_id = c.column_id - INNER JOIN sys.types AS t ON c.user_type_id = t.user_type_id - WHERE i.is_primary_key = 1 AND i.object_id = {userTableId}; - "; - this._logger.LogDebugWithThreadId($"BEGIN GetPrimaryKeyColumns Query={getPrimaryKeyColumnsQuery}"); - using (var getPrimaryKeyColumnsCommand = new SqlCommand(getPrimaryKeyColumnsQuery, connection)) - using (SqlDataReader reader = await getPrimaryKeyColumnsCommand.ExecuteReaderAsync(cancellationToken)) - { - string[] variableLengthTypes = new[] { "varchar", "nvarchar", "nchar", "char", "binary", "varbinary" }; - string[] variablePrecisionTypes = new[] { "numeric", "decimal" }; - - var primaryKeyColumns = new List<(string name, string type)>(); - - while (await reader.ReadAsync(cancellationToken)) - { - string name = reader.GetString(NameIndex); - string type = reader.GetString(TypeIndex); - - if (variableLengthTypes.Contains(type, StringComparer.OrdinalIgnoreCase)) - { - // Special "max" case. I'm actually not sure it's valid to have varchar(max) as a primary key because - // it exceeds the byte limit of an index field (900 bytes), but just in case - short length = reader.GetInt16(LengthIndex); - type += length == -1 ? "(max)" : $"({length})"; - } - else if (variablePrecisionTypes.Contains(type)) - { - byte precision = reader.GetByte(PrecisionIndex); - byte scale = reader.GetByte(ScaleIndex); - type += $"({precision},{scale})"; - } - - primaryKeyColumns.Add((name, type)); - } - - if (primaryKeyColumns.Count == 0) - { - throw new InvalidOperationException($"Could not find primary key created in table: '{this._userTable.FullName}'."); - } - - this._logger.LogDebugWithThreadId($"END GetPrimaryKeyColumns ColumnNames(types) = {string.Join(", ", primaryKeyColumns.Select(col => $"'{col.name}({col.type})'"))}."); - return primaryKeyColumns; - } - } - /// /// Gets the column names of the user table. /// @@ -571,152 +481,14 @@ PRIMARY KEY ({primaryKeys}) } } - async Task IScaleMonitor.GetMetricsAsync() - { - return await this.GetMetricsAsync(); - } - - public async Task GetMetricsAsync() - { - Debug.Assert(!(this._changeMonitor is null)); - - return new SqlTriggerMetrics - { - UnprocessedChangeCount = await this._changeMonitor.GetUnprocessedChangeCountAsync(), - Timestamp = DateTime.UtcNow, - }; - } - - ScaleStatus IScaleMonitor.GetScaleStatus(ScaleStatusContext context) + public IScaleMonitor GetMonitor() { - return this.GetScaleStatusWithTelemetry(context.WorkerCount, context.Metrics?.Cast().ToArray()); + return this._scaleMonitor; } - public ScaleStatus GetScaleStatus(ScaleStatusContext context) + public ITargetScaler GetTargetScaler() { - return this.GetScaleStatusWithTelemetry(context.WorkerCount, context.Metrics?.ToArray()); - } - - private ScaleStatus GetScaleStatusWithTelemetry(int workerCount, SqlTriggerMetrics[] metrics) - { - var status = new ScaleStatus - { - Vote = ScaleVote.None, - }; - - var properties = new Dictionary(this._telemetryProps) - { - [TelemetryPropertyName.ScaleRecommendation] = $"{status.Vote}", - [TelemetryPropertyName.TriggerMetrics] = metrics is null ? "null" : $"[{string.Join(", ", metrics.Select(metric => metric.UnprocessedChangeCount))}]", - [TelemetryPropertyName.WorkerCount] = $"{workerCount}", - }; - - try - { - status = this.GetScaleStatusCore(workerCount, metrics); - - properties[TelemetryPropertyName.ScaleRecommendation] = $"{status.Vote}"; - TelemetryInstance.TrackEvent(TelemetryEventName.GetScaleStatus, properties); - } - catch (Exception ex) - { - this._logger.LogError($"Failed to get scale status for table '{this._userTable.FullName}' due to exception: {ex.GetType()}. Exception message: {ex.Message}"); - TelemetryInstance.TrackException(TelemetryErrorName.GetScaleStatus, ex, properties); - } - - return status; - } - - /// - /// Returns scale recommendation i.e. whether to scale in or out the host application. The recommendation is - /// made based on both the latest metrics and the trend of increase or decrease in the count of unprocessed - /// changes in the user table. In all of the calculations, it is attempted to keep the number of workers minimum - /// while also ensuring that the count of unprocessed changes per worker stays under the maximum limit. - /// - /// The current worker count for the host application. - /// The collection of metrics samples to make the scale decision. - /// - private ScaleStatus GetScaleStatusCore(int workerCount, SqlTriggerMetrics[] metrics) - { - // We require minimum 5 samples to estimate the trend of variation in count of unprocessed changes with - // certain reliability. These samples roughly cover the timespan of past 40 seconds. - const int minSamplesForScaling = 5; - - var status = new ScaleStatus - { - Vote = ScaleVote.None, - }; - - // Do not make a scale decision unless we have enough samples. - if (metrics is null || (metrics.Length < minSamplesForScaling)) - { - this._logger.LogInformation($"Requesting no-scaling: Insufficient metrics for making scale decision for table: '{this._userTable.FullName}'."); - return status; - } - - // Consider only the most recent batch of samples in the rest of the method. - metrics = metrics.TakeLast(minSamplesForScaling).ToArray(); - - string counts = string.Join(", ", metrics.Select(metric => metric.UnprocessedChangeCount)); - this._logger.LogInformation($"Unprocessed change counts: [{counts}], worker count: {workerCount}, maximum changes per worker: {this._maxChangesPerWorker}."); - - // Add worker if the count of unprocessed changes per worker exceeds the maximum limit. - long lastUnprocessedChangeCount = metrics.Last().UnprocessedChangeCount; - if (lastUnprocessedChangeCount > workerCount * this._maxChangesPerWorker) - { - status.Vote = ScaleVote.ScaleOut; - this._logger.LogInformation($"Requesting scale-out: Found too many unprocessed changes for table: '{this._userTable.FullName}' relative to the number of workers."); - return status; - } - - // Check if there is a continuous increase or decrease in count of unprocessed changes. - bool isIncreasing = true; - bool isDecreasing = true; - for (int index = 0; index < metrics.Length - 1; index++) - { - isIncreasing = isIncreasing && metrics[index].UnprocessedChangeCount < metrics[index + 1].UnprocessedChangeCount; - isDecreasing = isDecreasing && (metrics[index + 1].UnprocessedChangeCount == 0 || metrics[index].UnprocessedChangeCount > metrics[index + 1].UnprocessedChangeCount); - } - - if (isIncreasing) - { - // Scale out only if the expected count of unprocessed changes would exceed the combined limit after 30 seconds. - DateTime referenceTime = metrics[metrics.Length - 1].Timestamp - TimeSpan.FromSeconds(30); - SqlTriggerMetrics referenceMetric = metrics.First(metric => metric.Timestamp > referenceTime); - long expectedUnprocessedChangeCount = (2 * metrics[metrics.Length - 1].UnprocessedChangeCount) - referenceMetric.UnprocessedChangeCount; - - if (expectedUnprocessedChangeCount > workerCount * this._maxChangesPerWorker) - { - status.Vote = ScaleVote.ScaleOut; - this._logger.LogInformation($"Requesting scale-out: Found the unprocessed changes for table: '{this._userTable.FullName}' to be continuously increasing" + - " and may exceed the maximum limit set for the workers."); - return status; - } - else - { - this._logger.LogDebug($"Avoiding scale-out: Found the unprocessed changes for table: '{this._userTable.FullName}' to be increasing" + - " but they may not exceed the maximum limit set for the workers."); - } - } - - if (isDecreasing) - { - // Scale in only if the count of unprocessed changes will not exceed the combined limit post the scale-in operation. - if (lastUnprocessedChangeCount <= (workerCount - 1) * this._maxChangesPerWorker) - { - status.Vote = ScaleVote.ScaleIn; - this._logger.LogInformation($"Requesting scale-in: Found table: '{this._userTable.FullName}' to be either idle or the unprocessed changes to be continuously decreasing."); - return status; - } - else - { - this._logger.LogDebug($"Avoiding scale-in: Found the unprocessed changes for table: '{this._userTable.FullName}' to be decreasing" + - " but they are high enough to require all existing workers for processing."); - } - } - - this._logger.LogInformation($"Requesting no-scaling: Found the number of unprocessed changes for table: '{this._userTable.FullName}' to not require scaling."); - return status; + return this._targetScaler; } /// diff --git a/src/TriggerBinding/SqlTriggerMetricsProvider.cs b/src/TriggerBinding/SqlTriggerMetricsProvider.cs new file mode 100644 index 00000000..f02283d9 --- /dev/null +++ b/src/TriggerBinding/SqlTriggerMetricsProvider.cs @@ -0,0 +1,128 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Data; +using System.Diagnostics; +using System.Globalization; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Data.SqlClient; +using Microsoft.Extensions.Logging; +using Microsoft.Azure.WebJobs.Extensions.Sql.Telemetry; +using static Microsoft.Azure.WebJobs.Extensions.Sql.Telemetry.Telemetry; +using static Microsoft.Azure.WebJobs.Extensions.Sql.SqlTriggerConstants; +using static Microsoft.Azure.WebJobs.Extensions.Sql.SqlTriggerUtils; + +namespace Microsoft.Azure.WebJobs.Extensions.Sql +{ + /// + /// Provider class for unprocessed changes metrics for SQL trigger scaling. + /// + internal class SqlTriggerMetricsProvider + { + private readonly string _connectionString; + private readonly ILogger _logger; + private readonly SqlObject _userTable; + private readonly string _userFunctionId; + + public SqlTriggerMetricsProvider(string connectionString, ILogger logger, SqlObject userTable, string userFunctionId) + { + this._connectionString = !string.IsNullOrEmpty(connectionString) ? connectionString : throw new ArgumentNullException(nameof(connectionString)); + this._logger = logger ?? throw new ArgumentNullException(nameof(logger)); + this._userTable = userTable ?? throw new ArgumentNullException(nameof(userTable)); + this._userFunctionId = !string.IsNullOrEmpty(userFunctionId) ? userFunctionId : throw new ArgumentNullException(nameof(userFunctionId)); + } + public async Task GetMetricsAsync() + { + return new SqlTriggerMetrics + { + UnprocessedChangeCount = await this.GetUnprocessedChangeCountAsync(), + Timestamp = DateTime.UtcNow, + }; + } + private async Task GetUnprocessedChangeCountAsync() + { + long unprocessedChangeCount = 0L; + long getUnprocessedChangesDurationMs = 0L; + + try + { + using (var connection = new SqlConnection(this._connectionString)) + { + this._logger.LogDebugWithThreadId("BEGIN OpenGetUnprocessedChangesConnection"); + await connection.OpenAsync(); + this._logger.LogDebugWithThreadId("END OpenGetUnprocessedChangesConnection"); + + int userTableId = await GetUserTableIdAsync(connection, this._userTable.FullName, this._logger, CancellationToken.None); + IReadOnlyList<(string name, string type)> primaryKeyColumns = await GetPrimaryKeyColumnsAsync(connection, userTableId, this._logger, this._userTable.FullName, CancellationToken.None); + + // Use a transaction to automatically release the app lock when we're done executing the query + using (SqlTransaction transaction = connection.BeginTransaction(IsolationLevel.RepeatableRead)) + { + try + { + using (SqlCommand getUnprocessedChangesCommand = this.BuildGetUnprocessedChangesCommand(connection, transaction, primaryKeyColumns, userTableId)) + { + this._logger.LogDebugWithThreadId($"BEGIN GetUnprocessedChangeCount Query={getUnprocessedChangesCommand.CommandText}"); + var commandSw = Stopwatch.StartNew(); + unprocessedChangeCount = (long)await getUnprocessedChangesCommand.ExecuteScalarAsync(); + getUnprocessedChangesDurationMs = commandSw.ElapsedMilliseconds; + } + + this._logger.LogDebugWithThreadId($"END GetUnprocessedChangeCount Duration={getUnprocessedChangesDurationMs}ms Count={unprocessedChangeCount}"); + transaction.Commit(); + } + catch (Exception) + { + try + { + transaction.Rollback(); + } + catch (Exception ex2) + { + this._logger.LogError($"GetUnprocessedChangeCount : Failed to rollback transaction due to exception: {ex2.GetType()}. Exception message: {ex2.Message}"); + TelemetryInstance.TrackException(TelemetryErrorName.GetUnprocessedChangeCountRollback, ex2); + } + throw; + } + } + } + } + catch (Exception ex) + { + this._logger.LogError($"Failed to query count of unprocessed changes for table '{this._userTable.FullName}' due to exception: {ex.GetType()}. Exception message: {ex.Message}"); + TelemetryInstance.TrackException(TelemetryErrorName.GetUnprocessedChangeCount, ex, null, new Dictionary() { { TelemetryMeasureName.GetUnprocessedChangesDurationMs, getUnprocessedChangesDurationMs } }); + throw; + } + + return unprocessedChangeCount; + } + private SqlCommand BuildGetUnprocessedChangesCommand(SqlConnection connection, SqlTransaction transaction, IReadOnlyList<(string name, string type)> primaryKeyColumns, int userTableId) + { + string leasesTableJoinCondition = string.Join(" AND ", primaryKeyColumns.Select(col => $"c.{col.name.AsBracketQuotedString()} = l.{col.name.AsBracketQuotedString()}")); + string leasesTableName = string.Format(CultureInfo.InvariantCulture, LeasesTableNameFormat, $"{this._userFunctionId}_{userTableId}"); + string getUnprocessedChangesQuery = $@" + {AppLockStatements} + + DECLARE @last_sync_version bigint; + SELECT @last_sync_version = LastSyncVersion + FROM {GlobalStateTableName} + WHERE UserFunctionID = '{this._userFunctionId}' AND UserTableID = {userTableId}; + + SELECT COUNT_BIG(*) + FROM CHANGETABLE(CHANGES {this._userTable.BracketQuotedFullName}, @last_sync_version) AS c + LEFT OUTER JOIN {leasesTableName} AS l ON {leasesTableJoinCondition} + WHERE + (l.{LeasesTableLeaseExpirationTimeColumnName} IS NULL AND + (l.{LeasesTableChangeVersionColumnName} IS NULL OR l.{LeasesTableChangeVersionColumnName} < c.{SysChangeVersionColumnName}) OR + l.{LeasesTableLeaseExpirationTimeColumnName} < SYSDATETIME()) AND + (l.{LeasesTableAttemptCountColumnName} IS NULL OR l.{LeasesTableAttemptCountColumnName} < {5}); + "; + + return new SqlCommand(getUnprocessedChangesQuery, connection, transaction); + } + } +} \ No newline at end of file diff --git a/src/TriggerBinding/SqlTriggerScaleMonitor.cs b/src/TriggerBinding/SqlTriggerScaleMonitor.cs new file mode 100644 index 00000000..85a96e7c --- /dev/null +++ b/src/TriggerBinding/SqlTriggerScaleMonitor.cs @@ -0,0 +1,184 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Microsoft.Azure.WebJobs.Extensions.Sql.Telemetry; +using static Microsoft.Azure.WebJobs.Extensions.Sql.Telemetry.Telemetry; +using Microsoft.Azure.WebJobs.Host.Scale; +using Microsoft.Extensions.Logging; +using MoreLinq; + +namespace Microsoft.Azure.WebJobs.Extensions.Sql +{ + /// + /// Makes the scale decision for incremental scaling(+1, 0, -1) for workers required based on unprocessed changes. + /// Guidance for scaling information can be found here https://learn.microsoft.com/en-us/azure/azure-functions/event-driven-scaling + /// + internal sealed class SqlTriggerScaleMonitor : IScaleMonitor + { + private readonly ILogger _logger; + private readonly SqlObject _userTable; + private readonly SqlTriggerMetricsProvider _metricsProvider; + private readonly IDictionary _telemetryProps = new Dictionary(); + private readonly int _maxChangesPerWorker; + + public SqlTriggerScaleMonitor(string userFunctionId, string userTableName, string connectionString, int maxChangesPerWorker, ILogger logger) + { + _ = !string.IsNullOrEmpty(userFunctionId) ? true : throw new ArgumentNullException(userFunctionId); + _ = !string.IsNullOrEmpty(userTableName) ? true : throw new ArgumentNullException(userTableName); + this._userTable = new SqlObject(userTableName); + // Do not convert the scale-monitor ID to lower-case string since SQL table names can be case-sensitive + // depending on the collation of the current database. + this.Descriptor = new ScaleMonitorDescriptor($"{userFunctionId}-SqlTrigger-{this._userTable.FullName}", userFunctionId); + this._metricsProvider = new SqlTriggerMetricsProvider(connectionString, logger, this._userTable, userFunctionId); + this._logger = logger ?? throw new ArgumentNullException(nameof(logger)); + this._maxChangesPerWorker = maxChangesPerWorker; + } + + public ScaleMonitorDescriptor Descriptor { get; } + async Task IScaleMonitor.GetMetricsAsync() + { + return await this.GetMetricsAsync(); + } + + public async Task GetMetricsAsync() + { + return await this._metricsProvider.GetMetricsAsync().ConfigureAwait(false); + } + + ScaleStatus IScaleMonitor.GetScaleStatus(ScaleStatusContext context) + { + return this.GetScaleStatusWithTelemetry(context.WorkerCount, context.Metrics?.Cast().ToArray()); + } + + public ScaleStatus GetScaleStatus(ScaleStatusContext context) + { + return this.GetScaleStatusWithTelemetry(context.WorkerCount, context.Metrics?.ToArray()); + } + + private ScaleStatus GetScaleStatusWithTelemetry(int workerCount, SqlTriggerMetrics[] metrics) + { + var status = new ScaleStatus + { + Vote = ScaleVote.None, + }; + + var properties = new Dictionary(this._telemetryProps) + { + [TelemetryPropertyName.ScaleRecommendation] = $"{status.Vote}", + [TelemetryPropertyName.TriggerMetrics] = metrics is null ? "null" : $"[{string.Join(", ", metrics.Select(metric => metric.UnprocessedChangeCount))}]", + [TelemetryPropertyName.WorkerCount] = $"{workerCount}", + }; + + try + { + status = this.GetScaleStatusCore(workerCount, metrics); + + properties[TelemetryPropertyName.ScaleRecommendation] = $"{status.Vote}"; + TelemetryInstance.TrackEvent(TelemetryEventName.GetScaleStatus, properties); + } + catch (Exception ex) + { + this._logger.LogError($"Failed to get scale status for table '{this._userTable.FullName}' due to exception: {ex.GetType()}. Exception message: {ex.Message}"); + TelemetryInstance.TrackException(TelemetryErrorName.GetScaleStatus, ex, properties); + } + + return status; + } + + /// + /// Returns scale recommendation i.e. whether to scale in or out the host application. The recommendation is + /// made based on both the latest metrics and the trend of increase or decrease in the count of unprocessed + /// changes in the user table. In all of the calculations, it is attempted to keep the number of workers minimum + /// while also ensuring that the count of unprocessed changes per worker stays under the maximum limit. + /// + /// The current worker count for the host application. + /// The collection of metrics samples to make the scale decision. + /// + private ScaleStatus GetScaleStatusCore(int workerCount, SqlTriggerMetrics[] metrics) + { + // We require minimum 5 samples to estimate the trend of variation in count of unprocessed changes with + // certain reliability. These samples roughly cover the timespan of past 40 seconds. + const int minSamplesForScaling = 5; + + var status = new ScaleStatus + { + Vote = ScaleVote.None, + }; + + // Do not make a scale decision unless we have enough samples. + if (metrics is null || (metrics.Length < minSamplesForScaling)) + { + this._logger.LogInformation($"Requesting no-scaling: Insufficient metrics for making scale decision for table: '{this._userTable.FullName}'."); + return status; + } + + // Consider only the most recent batch of samples in the rest of the method. + metrics = metrics.TakeLast(minSamplesForScaling).ToArray(); + + string counts = string.Join(", ", metrics.Select(metric => metric.UnprocessedChangeCount)); + this._logger.LogDebugWithThreadId($"Unprocessed change counts: [{counts}], worker count: {workerCount}, maximum changes per worker: {this._maxChangesPerWorker}."); + + // Add worker if the count of unprocessed changes per worker exceeds the maximum limit. + long lastUnprocessedChangeCount = metrics.Last().UnprocessedChangeCount; + if (lastUnprocessedChangeCount > workerCount * this._maxChangesPerWorker) + { + status.Vote = ScaleVote.ScaleOut; + this._logger.LogInformation($"Requesting scale-out: Found too many unprocessed changes: {lastUnprocessedChangeCount} for table: '{this._userTable.FullName}' relative to the number of workers."); + return status; + } + + // Check if there is a continuous increase or decrease in count of unprocessed changes. + bool isIncreasing = true; + bool isDecreasing = true; + for (int index = 0; index < metrics.Length - 1; index++) + { + isIncreasing = isIncreasing && metrics[index].UnprocessedChangeCount < metrics[index + 1].UnprocessedChangeCount; + isDecreasing = isDecreasing && (metrics[index + 1].UnprocessedChangeCount == 0 || metrics[index].UnprocessedChangeCount > metrics[index + 1].UnprocessedChangeCount); + } + + if (isIncreasing) + { + // Scale out only if the expected count of unprocessed changes would exceed the combined limit after 30 seconds. + DateTime referenceTime = metrics[metrics.Length - 1].Timestamp - TimeSpan.FromSeconds(30); + SqlTriggerMetrics referenceMetric = metrics.First(metric => metric.Timestamp > referenceTime); + long expectedUnprocessedChangeCount = (2 * metrics[metrics.Length - 1].UnprocessedChangeCount) - referenceMetric.UnprocessedChangeCount; + + if (expectedUnprocessedChangeCount > workerCount * this._maxChangesPerWorker) + { + status.Vote = ScaleVote.ScaleOut; + this._logger.LogInformation($"Requesting scale-out: Found the unprocessed changes for table: '{this._userTable.FullName}' to be continuously increasing" + + " and may exceed the maximum limit set for the workers."); + return status; + } + else + { + this._logger.LogDebugWithThreadId($"Avoiding scale-out: Found the unprocessed changes: {lastUnprocessedChangeCount} for table: '{this._userTable.FullName}' to be increasing" + + " but they may not exceed the maximum limit set for the workers."); + } + } + + if (isDecreasing) + { + // Scale in only if the count of unprocessed changes will not exceed the combined limit post the scale-in operation. + if (lastUnprocessedChangeCount <= (workerCount - 1) * this._maxChangesPerWorker) + { + status.Vote = ScaleVote.ScaleIn; + this._logger.LogInformation($"Requesting scale-in: Found table: '{this._userTable.FullName}' to be either idle or the unprocessed changes to be continuously decreasing."); + return status; + } + else + { + this._logger.LogDebugWithThreadId($"Avoiding scale-in: Found the unprocessed changes for table: '{this._userTable.FullName}' to be decreasing" + + " but they are high enough to require all existing workers for processing."); + } + } + + this._logger.LogInformation($"Requesting no-scaling: Found the number of unprocessed changes for table: '{this._userTable.FullName}' to not require scaling."); + return status; + } + } +} \ No newline at end of file diff --git a/src/TriggerBinding/SqlTriggerTargetScaler.cs b/src/TriggerBinding/SqlTriggerTargetScaler.cs new file mode 100644 index 00000000..5b332131 --- /dev/null +++ b/src/TriggerBinding/SqlTriggerTargetScaler.cs @@ -0,0 +1,60 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. +using System; +using System.Threading.Tasks; +using Microsoft.Azure.WebJobs.Host.Scale; +using Microsoft.Extensions.Logging; + +namespace Microsoft.Azure.WebJobs.Extensions.Sql +{ + /// + /// Represents the target based scaler returning a target worker count. + /// + internal sealed class SqlTriggerTargetScaler : ITargetScaler + { + private readonly string _userFunctionId; + private readonly string _userTableName; + private readonly ILogger _logger; + private readonly SqlTriggerMetricsProvider _metricsProvider; + private readonly int _maxChangesPerWorker; + + public SqlTriggerTargetScaler(string userFunctionId, string userTableName, string connectionString, int maxChangesPerWorker, ILogger logger) + { + this._userTableName = !string.IsNullOrEmpty(userTableName) ? userTableName : throw new ArgumentNullException(userTableName); + this._userFunctionId = !string.IsNullOrEmpty(userFunctionId) ? userFunctionId : throw new ArgumentNullException(userFunctionId); + this._logger = logger ?? throw new ArgumentNullException(nameof(logger)); + this._metricsProvider = new SqlTriggerMetricsProvider(connectionString, logger, new SqlObject(userTableName), userFunctionId); + this.TargetScalerDescriptor = new TargetScalerDescriptor(userFunctionId); + this._maxChangesPerWorker = maxChangesPerWorker; + } + + public TargetScalerDescriptor TargetScalerDescriptor { get; } + + public async Task GetScaleResultAsync(TargetScalerContext context) + { + SqlTriggerMetrics metrics = await this._metricsProvider.GetMetricsAsync(); + + // Instance concurrency value is set by the functions host when dynamic concurrency is enabled. See https://learn.microsoft.com/en-us/azure/azure-functions/functions-concurrency for more details. + int concurrency = context.InstanceConcurrency ?? this._maxChangesPerWorker; + + return this.GetScaleResultInternal(concurrency, metrics.UnprocessedChangeCount); + } + + internal TargetScalerResult GetScaleResultInternal(int concurrency, long unprocessedChangeCount) + { + if (concurrency < 1) + { + throw new ArgumentOutOfRangeException(nameof(concurrency), $"Unexpected concurrency='{concurrency}' - the value must be > 0."); + } + + int targetWorkerCount = (int)Math.Ceiling(unprocessedChangeCount / (decimal)concurrency); + + this._logger.LogInformation($"Target worker count for function '{this._userFunctionId}' is '{targetWorkerCount}' TableName ='{this._userTableName}', UnprocessedChangeCount ='{unprocessedChangeCount}', Concurrency='{concurrency}')."); + + return new TargetScalerResult + { + TargetWorkerCount = targetWorkerCount + }; + } + } +} \ No newline at end of file diff --git a/src/TriggerBinding/SqlTriggerUtils.cs b/src/TriggerBinding/SqlTriggerUtils.cs new file mode 100644 index 00000000..b2803808 --- /dev/null +++ b/src/TriggerBinding/SqlTriggerUtils.cs @@ -0,0 +1,118 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Data.SqlClient; +using Microsoft.Extensions.Logging; + +namespace Microsoft.Azure.WebJobs.Extensions.Sql +{ + public static class SqlTriggerUtils + { + + /// + /// Gets the names and types of primary key columns of the user table. + /// + /// SQL connection used to connect to user database + /// ID of the user table + /// Facilitates logging of messages + /// Name of the user table, doesn't need to be escaped since it's only used for logging + /// Cancellation token to pass to the command + /// + /// Thrown if there are no primary key columns present in the user table or if their names conflict with columns in leases table. + /// + public static async Task> GetPrimaryKeyColumnsAsync(SqlConnection connection, int userTableId, ILogger logger, string userTableName, CancellationToken cancellationToken) + { + const int NameIndex = 0, TypeIndex = 1, LengthIndex = 2, PrecisionIndex = 3, ScaleIndex = 4; + string getPrimaryKeyColumnsQuery = $@" + SELECT + c.name, + t.name, + c.max_length, + c.precision, + c.scale + FROM sys.indexes AS i + INNER JOIN sys.index_columns AS ic ON i.object_id = ic.object_id AND i.index_id = ic.index_id + INNER JOIN sys.columns AS c ON ic.object_id = c.object_id AND ic.column_id = c.column_id + INNER JOIN sys.types AS t ON c.user_type_id = t.user_type_id + WHERE i.is_primary_key = 1 AND i.object_id = {userTableId}; + "; + logger.LogDebugWithThreadId($"BEGIN GetPrimaryKeyColumns Query={getPrimaryKeyColumnsQuery}"); + using (var getPrimaryKeyColumnsCommand = new SqlCommand(getPrimaryKeyColumnsQuery, connection)) + using (SqlDataReader reader = await getPrimaryKeyColumnsCommand.ExecuteReaderAsync(cancellationToken)) + { + string[] variableLengthTypes = new[] { "varchar", "nvarchar", "nchar", "char", "binary", "varbinary" }; + string[] variablePrecisionTypes = new[] { "numeric", "decimal" }; + + var primaryKeyColumns = new List<(string name, string type)>(); + + while (await reader.ReadAsync(cancellationToken)) + { + string name = reader.GetString(NameIndex); + string type = reader.GetString(TypeIndex); + + if (variableLengthTypes.Contains(type, StringComparer.OrdinalIgnoreCase)) + { + // Special "max" case. I'm actually not sure it's valid to have varchar(max) as a primary key because + // it exceeds the byte limit of an index field (900 bytes), but just in case + short length = reader.GetInt16(LengthIndex); + type += length == -1 ? "(max)" : $"({length})"; + } + else if (variablePrecisionTypes.Contains(type)) + { + byte precision = reader.GetByte(PrecisionIndex); + byte scale = reader.GetByte(ScaleIndex); + type += $"({precision},{scale})"; + } + + primaryKeyColumns.Add((name, type)); + } + + if (primaryKeyColumns.Count == 0) + { + throw new InvalidOperationException($"Could not find primary key created in table: '{userTableName}'."); + } + + logger.LogDebugWithThreadId($"END GetPrimaryKeyColumns ColumnNames(types) = {string.Join(", ", primaryKeyColumns.Select(col => $"'{col.name}({col.type})'"))}."); + return primaryKeyColumns; + } + } + + /// + /// Returns the object ID of the user table. + /// + /// SQL connection used to connect to user database + /// Name of the user table + /// Facilitates logging of messages + /// Cancellation token to pass to the command + /// Thrown in case of error when querying the object ID for the user table + public static async Task GetUserTableIdAsync(SqlConnection connection, string userTableName, ILogger logger, CancellationToken cancellationToken) + { + var userTable = new SqlObject(userTableName); + string getObjectIdQuery = $"SELECT OBJECT_ID(N{userTable.QuotedFullName}, 'U');"; + + logger.LogDebugWithThreadId($"BEGIN GetUserTableId Query={getObjectIdQuery}"); + using (var getObjectIdCommand = new SqlCommand(getObjectIdQuery, connection)) + using (SqlDataReader reader = await getObjectIdCommand.ExecuteReaderAsync(cancellationToken)) + { + if (!await reader.ReadAsync(cancellationToken)) + { + throw new InvalidOperationException($"Received empty response when querying the object ID for table: '{userTableName}'."); + } + + object userTableId = reader.GetValue(0); + + if (userTableId is DBNull) + { + throw new InvalidOperationException($"Could not find table: '{userTableName}'."); + } + logger.LogDebugWithThreadId($"END GetUserTableId TableId={userTableId}"); + return (int)userTableId; + } + } + } +} \ No newline at end of file diff --git a/src/packages.lock.json b/src/packages.lock.json index ee01c2eb..fbea72fd 100644 --- a/src/packages.lock.json +++ b/src/packages.lock.json @@ -26,11 +26,11 @@ }, "Microsoft.Azure.WebJobs": { "type": "Direct", - "requested": "[3.0.33, )", - "resolved": "3.0.33", - "contentHash": "eSv858ll+GsLYxM2+RKBiTC34CvGnEgLtnrzRljzrociIXsosadHDQLxvvqu0eyIQRRf5kc4MHII/wc0HRNvyg==", + "requested": "[3.0.36, )", + "resolved": "3.0.36", + "contentHash": "S5wppmL8sUfanGmo/zDzpoWrcM+IOvGNFjpkD5XJFCFzrBqktZTEX6MpP/vF3RN6VZm2sFKegZYyce+RNhgE4Q==", "dependencies": { - "Microsoft.Azure.WebJobs.Core": "3.0.33", + "Microsoft.Azure.WebJobs.Core": "3.0.36", "Microsoft.Extensions.Configuration": "2.1.1", "Microsoft.Extensions.Configuration.Abstractions": "2.1.1", "Microsoft.Extensions.Configuration.EnvironmentVariables": "2.1.0", @@ -39,8 +39,8 @@ "Microsoft.Extensions.Logging": "2.1.1", "Microsoft.Extensions.Logging.Abstractions": "2.1.1", "Microsoft.Extensions.Logging.Configuration": "2.1.0", - "Newtonsoft.Json": "11.0.2", - "System.Memory.Data": "1.0.1", + "Newtonsoft.Json": "13.0.1", + "System.Memory.Data": "1.0.2", "System.Threading.Tasks.Dataflow": "4.8.0" } }, @@ -165,11 +165,12 @@ }, "Microsoft.Azure.WebJobs.Core": { "type": "Transitive", - "resolved": "3.0.33", - "contentHash": "4Rp5of6KFEGisQL7OJV2/8v3IQGFvTgZ9xSbB3bfkw5VHoNKn3Yllx7Qk/oUUX7Zey7jNu5mQFIR6dW6xNMX5Q==", + "resolved": "3.0.36", + "contentHash": "4ht/u677qxDL5rviXafYI5K9qtxee8peXQd8JDqG5KBRr5VXct2pgpEs5SnNu+gDO8dAVp+83N1IRiGG2Fl7Nw==", "dependencies": { "System.ComponentModel.Annotations": "4.4.0", - "System.Diagnostics.TraceSource": "4.3.0" + "System.Diagnostics.TraceSource": "4.3.0", + "System.Memory.Data": "1.0.1" } }, "Microsoft.Bcl.AsyncInterfaces": { diff --git a/test/Integration/SqlTriggerBindingIntegrationTests.cs b/test/Integration/SqlTriggerBindingIntegrationTests.cs index e13f2422..0b1489c7 100644 --- a/test/Integration/SqlTriggerBindingIntegrationTests.cs +++ b/test/Integration/SqlTriggerBindingIntegrationTests.cs @@ -526,15 +526,17 @@ public void ChangeTrackingNotEnabledTriggerTest() public async void GetMetricsTest() { this.SetChangeTrackingForTable("Products"); + string userFunctionId = "func-id"; IConfiguration configuration = new ConfigurationBuilder().Build(); - var listener = new SqlTriggerListener(this.DbConnectionString, "dbo.Products", "func-id", Mock.Of(), Mock.Of(), configuration); + var listener = new SqlTriggerListener(this.DbConnectionString, "dbo.Products", userFunctionId, Mock.Of(), Mock.Of(), configuration); await listener.StartAsync(CancellationToken.None); // Cancel immediately so the listener doesn't start processing the changes await listener.StopAsync(CancellationToken.None); - SqlTriggerMetrics metrics = await listener.GetMetricsAsync(); + var metricsProvider = new SqlTriggerMetricsProvider(this.DbConnectionString, Mock.Of(), new SqlObject("dbo.Products"), userFunctionId); + SqlTriggerMetrics metrics = await metricsProvider.GetMetricsAsync(); Assert.True(metrics.UnprocessedChangeCount == 0, "There should initially be 0 unprocessed changes"); this.InsertProducts(1, 5); - metrics = await listener.GetMetricsAsync(); + metrics = await metricsProvider.GetMetricsAsync(); Assert.True(metrics.UnprocessedChangeCount == 5, $"There should be 5 unprocessed changes after insertion. Actual={metrics.UnprocessedChangeCount}"); } diff --git a/test/Unit/TriggerBinding/SqlTriggerListenerTests.cs b/test/Unit/TriggerBinding/SqlTriggerScaleMonitorTests.cs similarity index 92% rename from test/Unit/TriggerBinding/SqlTriggerListenerTests.cs rename to test/Unit/TriggerBinding/SqlTriggerScaleMonitorTests.cs index 6f50e099..828f2df8 100644 --- a/test/Unit/TriggerBinding/SqlTriggerListenerTests.cs +++ b/test/Unit/TriggerBinding/SqlTriggerScaleMonitorTests.cs @@ -14,7 +14,7 @@ namespace Microsoft.Azure.WebJobs.Extensions.Sql.Tests.Unit { - public class SqlTriggerListenerTests + public class SqlTriggerScaleMonitorTests { /// /// Verifies that the scale monitor descriptor ID is set to expected value. @@ -81,7 +81,7 @@ public void ScaleMonitorGetScaleStatus_LastCountAboveLimit_ReturnsScaleOut(int[] ScaleStatus scaleStatus = monitor.GetScaleStatus(context); Assert.Equal(ScaleVote.ScaleOut, scaleStatus.Vote); - Assert.Contains("Requesting scale-out: Found too many unprocessed changes for table: 'testTableName' relative to the number of workers.", logMessages); + Assert.Contains($"Requesting scale-out: Found too many unprocessed changes: {unprocessedChangeCounts.Last()} for table: 'testTableName' relative to the number of workers.", string.Join(" ", logMessages)); } /// @@ -138,7 +138,7 @@ public void ScaleMonitorGetScaleStatus_CountIncreasingBelowLimit_ReturnsNone(int ScaleStatus scaleStatus = monitor.GetScaleStatus(context); Assert.Equal(ScaleVote.None, scaleStatus.Vote); - Assert.Contains("Avoiding scale-out: Found the unprocessed changes for table: 'testTableName' to be increasing but they may not exceed the maximum limit set for the workers.", logMessages); + Assert.Contains($"Avoiding scale-out: Found the unprocessed changes: {unprocessedChangeCounts.Last()} for table: 'testTableName' to be increasing but they may not exceed the maximum limit set for the workers.", string.Join(" ", logMessages)); } /// @@ -176,7 +176,7 @@ public void ScaleMonitorGetScaleStatus_CountDecreasingAboveLimit_ReturnsNone(int ScaleStatus scaleStatus = monitor.GetScaleStatus(context); Assert.Equal(ScaleVote.None, scaleStatus.Vote); - Assert.Contains("Avoiding scale-in: Found the unprocessed changes for table: 'testTableName' to be decreasing but they are high enough to require all existing workers for processing.", logMessages); + Assert.Contains("Avoiding scale-in: Found the unprocessed changes for table: 'testTableName' to be decreasing but they are high enough to require all existing workers for processing.", string.Join(" ", logMessages)); } /// @@ -202,27 +202,25 @@ public void ScaleMonitorGetScaleStatus_CountNotIncreasingOrDecreasing_ReturnsNon } [Theory] - [InlineData("1")] - [InlineData("100")] - [InlineData("10000")] - public void ScaleMonitorGetScaleStatus_UserConfiguredMaxChangesPerWorker_RespectsConfiguration(string maxChangesPerWorker) + [InlineData(1)] + [InlineData(100)] + [InlineData(10000)] + public void ScaleMonitorGetScaleStatus_UserConfiguredMaxChangesPerWorker_RespectsConfiguration(int maxChangesPerWorker) { (IScaleMonitor monitor, _) = GetScaleMonitor(maxChangesPerWorker); ScaleStatusContext context; ScaleStatus scaleStatus; - int max = int.Parse(maxChangesPerWorker); - - context = GetScaleStatusContext(new int[] { 0, 0, 0, 0, 10 * max }, 10); + context = GetScaleStatusContext(new int[] { 0, 0, 0, 0, 10 * maxChangesPerWorker }, 10); scaleStatus = monitor.GetScaleStatus(context); Assert.Equal(ScaleVote.None, scaleStatus.Vote); - context = GetScaleStatusContext(new int[] { 0, 0, 0, 0, (10 * max) + 1 }, 10); + context = GetScaleStatusContext(new int[] { 0, 0, 0, 0, (10 * maxChangesPerWorker) + 1 }, 10); scaleStatus = monitor.GetScaleStatus(context); Assert.Equal(ScaleVote.ScaleOut, scaleStatus.Vote); - context = GetScaleStatusContext(new int[] { (9 * max) + 4, (9 * max) + 3, (9 * max) + 2, (9 * max) + 1, 9 * max }, 10); + context = GetScaleStatusContext(new int[] { (9 * maxChangesPerWorker) + 4, (9 * maxChangesPerWorker) + 3, (9 * maxChangesPerWorker) + 2, (9 * maxChangesPerWorker) + 1, 9 * maxChangesPerWorker }, 10); scaleStatus = monitor.GetScaleStatus(context); Assert.Equal(ScaleVote.ScaleIn, scaleStatus.Vote); } @@ -242,29 +240,24 @@ public void InvalidUserConfiguredMaxChangesPerWorker(string maxChangesPerWorker) private static IScaleMonitor GetScaleMonitor(string tableName, string userFunctionId) { - Mock mockConfiguration = CreateMockConfiguration(); - - return new SqlTriggerListener( - "testConnectionString", - tableName, + return new SqlTriggerScaleMonitor( userFunctionId, - Mock.Of(), - Mock.Of(), - mockConfiguration.Object); + tableName, + "testConnectionString", + SqlTriggerListener.DefaultMaxChangesPerWorker, + Mock.Of()); } - private static (IScaleMonitor monitor, List logMessages) GetScaleMonitor(string maxChangesPerWorker = null) + private static (IScaleMonitor monitor, List logMessages) GetScaleMonitor(int maxChangesPerWorker = SqlTriggerListener.DefaultMaxChangesPerWorker) { (Mock mockLogger, List logMessages) = CreateMockLogger(); - Mock mockConfiguration = CreateMockConfiguration(maxChangesPerWorker); - IScaleMonitor monitor = new SqlTriggerListener( - "testConnectionString", - "testTableName", + IScaleMonitor monitor = new SqlTriggerScaleMonitor( "testUserFunctionId", - Mock.Of(), - mockLogger.Object, - mockConfiguration.Object); + "testTableName", + "testConnectionString", + maxChangesPerWorker, + mockLogger.Object); return (monitor, logMessages); } diff --git a/test/Unit/TriggerBinding/SqlTriggerTargetScalerTests.cs b/test/Unit/TriggerBinding/SqlTriggerTargetScalerTests.cs new file mode 100644 index 00000000..edef5f6c --- /dev/null +++ b/test/Unit/TriggerBinding/SqlTriggerTargetScalerTests.cs @@ -0,0 +1,36 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using Microsoft.Azure.WebJobs.Host.Scale; +using Microsoft.Extensions.Logging; +using Moq; +using Xunit; + +namespace Microsoft.Azure.WebJobs.Extensions.Sql.Tests.Unit +{ + public class SqlTriggerTargetScalerTests + { + /// + /// Verifies that the scale result returns the expected target worker count. + /// + [Theory] + [InlineData(6000, null, 6)] + [InlineData(4500, null, 5)] + [InlineData(1080, 100, 11)] + [InlineData(100, null, 1)] + public void SqlTriggerTargetScaler_Returns_Expected(int unprocessedChangeCount, int? concurrency, int expected) + { + var targetScaler = new SqlTriggerTargetScaler( + "testUserFunctionId", + "testUserTableName", + "testConnectionString", + SqlTriggerListener.DefaultMaxChangesPerWorker, + Mock.Of() + ); + + TargetScalerResult result = targetScaler.GetScaleResultInternal(concurrency ?? SqlTriggerListener.DefaultMaxChangesPerWorker, unprocessedChangeCount); + + Assert.Equal(result.TargetWorkerCount, expected); + } + } +} \ No newline at end of file diff --git a/test/packages.lock.json b/test/packages.lock.json index e5325c83..492b1635 100644 --- a/test/packages.lock.json +++ b/test/packages.lock.json @@ -331,11 +331,12 @@ }, "Microsoft.Azure.WebJobs.Core": { "type": "Transitive", - "resolved": "3.0.33", - "contentHash": "4Rp5of6KFEGisQL7OJV2/8v3IQGFvTgZ9xSbB3bfkw5VHoNKn3Yllx7Qk/oUUX7Zey7jNu5mQFIR6dW6xNMX5Q==", + "resolved": "3.0.36", + "contentHash": "4ht/u677qxDL5rviXafYI5K9qtxee8peXQd8JDqG5KBRr5VXct2pgpEs5SnNu+gDO8dAVp+83N1IRiGG2Fl7Nw==", "dependencies": { "System.ComponentModel.Annotations": "4.4.0", - "System.Diagnostics.TraceSource": "4.3.0" + "System.Diagnostics.TraceSource": "4.3.0", + "System.Memory.Data": "1.0.1" } }, "Microsoft.Azure.WebJobs.Extensions": { @@ -1857,7 +1858,7 @@ "dependencies": { "Microsoft.ApplicationInsights": "[2.21.0, )", "Microsoft.AspNetCore.Http": "[2.2.2, )", - "Microsoft.Azure.WebJobs": "[3.0.33, )", + "Microsoft.Azure.WebJobs": "[3.0.36, )", "Microsoft.Data.SqlClient": "[5.0.1, )", "Newtonsoft.Json": "[13.0.2, )", "System.Runtime.Caching": "[5.0.0, )", @@ -1908,11 +1909,11 @@ }, "Microsoft.Azure.WebJobs": { "type": "CentralTransitive", - "requested": "[3.0.33, )", - "resolved": "3.0.33", - "contentHash": "eSv858ll+GsLYxM2+RKBiTC34CvGnEgLtnrzRljzrociIXsosadHDQLxvvqu0eyIQRRf5kc4MHII/wc0HRNvyg==", + "requested": "[3.0.36, )", + "resolved": "3.0.36", + "contentHash": "S5wppmL8sUfanGmo/zDzpoWrcM+IOvGNFjpkD5XJFCFzrBqktZTEX6MpP/vF3RN6VZm2sFKegZYyce+RNhgE4Q==", "dependencies": { - "Microsoft.Azure.WebJobs.Core": "3.0.33", + "Microsoft.Azure.WebJobs.Core": "3.0.36", "Microsoft.Extensions.Configuration": "2.1.1", "Microsoft.Extensions.Configuration.Abstractions": "2.1.1", "Microsoft.Extensions.Configuration.EnvironmentVariables": "2.1.0", @@ -1921,8 +1922,8 @@ "Microsoft.Extensions.Logging": "2.1.1", "Microsoft.Extensions.Logging.Abstractions": "2.1.1", "Microsoft.Extensions.Logging.Configuration": "2.1.0", - "Newtonsoft.Json": "11.0.2", - "System.Memory.Data": "1.0.1", + "Newtonsoft.Json": "13.0.1", + "System.Memory.Data": "1.0.2", "System.Threading.Tasks.Dataflow": "4.8.0" } },