Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
0410f88
Refactor SqlTriggerListener scaling to SqlTriggerMetricsProvider and …
AmeyaRele Dec 1, 2022
4861d58
Create SqlTriggerTargetScaler
AmeyaRele Dec 2, 2022
6434a33
Refactor unit tests
AmeyaRele Dec 21, 2022
221c577
Refactor to include common queries for scaling and listener class to …
AmeyaRele Jan 11, 2023
c5805e4
Add doc comments for scaling classes
AmeyaRele Jan 18, 2023
e6e7973
Update src/TriggerBinding/SqlTriggerTargetScaler.cs
AmeyaRele Jan 19, 2023
a437d2c
Fix log statement
AmeyaRele Jan 23, 2023
f239b09
Merge branch 'ameyarele/target-based-scaling' of https://github.com/A…
AmeyaRele Jan 23, 2023
1621a09
Update WebJobs package
AmeyaRele Feb 15, 2023
3e5961c
Update nuget.config
AmeyaRele Feb 15, 2023
a16a5ea
Address review comments
AmeyaRele Feb 16, 2023
0636741
Address review comments pt2
AmeyaRele Feb 21, 2023
d9e1201
Update src/TriggerBinding/SqlTriggerTargetScaler.cs
AmeyaRele Feb 21, 2023
4bbaefc
Merge branch 'main' into ameyarele/target-based-scaling
AmeyaRele Feb 28, 2023
f9335a8
Address comments, test failures
AmeyaRele Mar 2, 2023
57f1dec
Merge branch 'main' into ameyarele/target-based-scaling
Charles-Gagnon Mar 2, 2023
dff5d1d
Fix packages lock file
AmeyaRele Mar 8, 2023
6cd9d88
Fix error message
AmeyaRele Mar 8, 2023
cf1aacb
Address comments and test failures
AmeyaRele Mar 9, 2023
bdc291b
Apply suggestions from code review
AmeyaRele Mar 9, 2023
b821935
Merge branch 'main' into ameyarele/target-based-scaling
Charles-Gagnon Mar 9, 2023
79c1d9f
Merge branch 'release/trigger' into ameyarele/target-based-scaling
Charles-Gagnon Mar 15, 2023
05a15e8
Change in documentation
AmeyaRele Mar 22, 2023
601c670
Merge branch 'release/trigger' into ameyarele/target-based-scaling
AmeyaRele Mar 22, 2023
10763d8
Fix log level
AmeyaRele Mar 28, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<PackageVersion Include="Microsoft.NET.Sdk.Functions" Version="4.1.3" />
<PackageVersion Include="Microsoft.Azure.Functions.Worker.Extensions.Abstractions" Version="1.1.0" />
<PackageVersion Include="Microsoft.ApplicationInsights" Version="2.17.0" />
<PackageVersion Include="Microsoft.Azure.WebJobs" Version="3.0.32" />
<PackageVersion Include="Microsoft.Azure.WebJobs" Version="3.0.36" />
<PackageVersion Include="Microsoft.Data.SqlClient" Version="5.0.1" />
<PackageVersion Include="Microsoft.SourceLink.GitHub" Version="1.1.1" />
<PackageVersion Include="morelinq" Version="3.3.2" />
Expand Down
3 changes: 2 additions & 1 deletion nuget.config
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@
<clear />
<add key="nuget.org" value="https://api.nuget.org/v3/index.json" protocolVersion="3" />
</packageSources>
</configuration>
</configuration>

61 changes: 31 additions & 30 deletions performance/packages.lock.json
Original file line number Diff line number Diff line change
Expand Up @@ -288,11 +288,12 @@
},
"Microsoft.Azure.WebJobs.Core": {
"type": "Transitive",
"resolved": "3.0.32",
"contentHash": "pW5lyF0Tno1cC2VkmBLyv7E3o5ObDdbn3pfpUpKdksJo9ysCdQTpgc0Ib99wPHca6BgvoglicGbDYXuatanMfg==",
"resolved": "3.0.36",
"contentHash": "4ht/u677qxDL5rviXafYI5K9qtxee8peXQd8JDqG5KBRr5VXct2pgpEs5SnNu+gDO8dAVp+83N1IRiGG2Fl7Nw==",
"dependencies": {
"System.ComponentModel.Annotations": "4.4.0",
"System.Diagnostics.TraceSource": "4.3.0"
"System.Diagnostics.TraceSource": "4.3.0",
"System.Memory.Data": "1.0.1"
}
},
"Microsoft.Azure.WebJobs.Extensions": {
Expand Down Expand Up @@ -2081,37 +2082,37 @@
"microsoft.azure.webjobs.extensions.sql": {
"type": "Project",
"dependencies": {
"Microsoft.ApplicationInsights": "[2.17.0, )",
"Microsoft.AspNetCore.Http": "[2.2.2, )",
"Microsoft.Azure.WebJobs": "[3.0.32, )",
"Microsoft.Data.SqlClient": "[5.0.1, )",
"Newtonsoft.Json": "[13.0.1, )",
"System.Runtime.Caching": "[5.0.0, )",
"morelinq": "[3.3.2, )"
"Microsoft.ApplicationInsights": "2.17.0",
"Microsoft.AspNetCore.Http": "2.2.2",
"Microsoft.Azure.WebJobs": "3.0.36",
"Microsoft.Data.SqlClient": "5.0.1",
"Newtonsoft.Json": "13.0.1",
"System.Runtime.Caching": "5.0.0",
"morelinq": "3.3.2"
}
},
"microsoft.azure.webjobs.extensions.sql.samples": {
"type": "Project",
"dependencies": {
"Microsoft.AspNetCore.Http": "[2.2.2, )",
"Microsoft.Azure.WebJobs.Extensions.Sql": "[99.99.99, )",
"Microsoft.Azure.WebJobs.Extensions.Storage": "[5.0.0, )",
"Microsoft.NET.Sdk.Functions": "[4.1.3, )",
"Newtonsoft.Json": "[13.0.1, )"
"Microsoft.AspNetCore.Http": "2.2.2",
"Microsoft.Azure.WebJobs.Extensions.Sql": "99.99.99",
"Microsoft.Azure.WebJobs.Extensions.Storage": "5.0.0",
"Microsoft.NET.Sdk.Functions": "4.1.3",
"Newtonsoft.Json": "13.0.1"
}
},
"microsoft.azure.webjobs.extensions.sql.tests": {
"type": "Project",
"dependencies": {
"Microsoft.AspNetCore.Http": "[2.2.2, )",
"Microsoft.Azure.WebJobs.Extensions.Sql": "[99.99.99, )",
"Microsoft.Azure.WebJobs.Extensions.Sql.Samples": "[1.0.0, )",
"Microsoft.NET.Sdk.Functions": "[4.1.3, )",
"Microsoft.NET.Test.Sdk": "[17.0.0, )",
"Moq": "[4.14.3, )",
"Newtonsoft.Json": "[13.0.1, )",
"xunit": "[2.4.0, )",
"xunit.runner.visualstudio": "[2.4.0, )"
"Microsoft.AspNetCore.Http": "2.2.2",
"Microsoft.Azure.WebJobs.Extensions.Sql": "99.99.99",
"Microsoft.Azure.WebJobs.Extensions.Sql.Samples": "1.0.0",
"Microsoft.NET.Sdk.Functions": "4.1.3",
"Microsoft.NET.Test.Sdk": "17.0.0",
"Moq": "4.14.3",
"Newtonsoft.Json": "13.0.1",
"xunit": "2.4.0",
"xunit.runner.visualstudio": "2.4.0"
}
},
"Microsoft.ApplicationInsights": {
Expand Down Expand Up @@ -2161,11 +2162,11 @@
},
"Microsoft.Azure.WebJobs": {
"type": "CentralTransitive",
"requested": "[3.0.32, )",
"resolved": "3.0.32",
"contentHash": "uN8GsFqPFHHcSrwwj/+0tGe6F6cOwugqUiePPw7W3TL9YC594+Hw8GBK5S/fcDWXacqvRRGf9nDX8xP94/Yiyw==",
"requested": "[3.0.36, )",
"resolved": "3.0.36",
"contentHash": "S5wppmL8sUfanGmo/zDzpoWrcM+IOvGNFjpkD5XJFCFzrBqktZTEX6MpP/vF3RN6VZm2sFKegZYyce+RNhgE4Q==",
"dependencies": {
"Microsoft.Azure.WebJobs.Core": "3.0.32",
"Microsoft.Azure.WebJobs.Core": "3.0.36",
"Microsoft.Extensions.Configuration": "2.1.1",
"Microsoft.Extensions.Configuration.Abstractions": "2.1.1",
"Microsoft.Extensions.Configuration.EnvironmentVariables": "2.1.0",
Expand All @@ -2174,8 +2175,8 @@
"Microsoft.Extensions.Logging": "2.1.1",
"Microsoft.Extensions.Logging.Abstractions": "2.1.1",
"Microsoft.Extensions.Logging.Configuration": "2.1.0",
"Newtonsoft.Json": "11.0.2",
"System.Memory.Data": "1.0.1",
"Newtonsoft.Json": "13.0.1",
"System.Memory.Data": "1.0.2",
"System.Threading.Tasks.Dataflow": "4.8.0"
}
},
Expand Down
33 changes: 17 additions & 16 deletions samples/samples-csharp/packages.lock.json
Original file line number Diff line number Diff line change
Expand Up @@ -276,11 +276,12 @@
},
"Microsoft.Azure.WebJobs.Core": {
"type": "Transitive",
"resolved": "3.0.32",
"contentHash": "pW5lyF0Tno1cC2VkmBLyv7E3o5ObDdbn3pfpUpKdksJo9ysCdQTpgc0Ib99wPHca6BgvoglicGbDYXuatanMfg==",
"resolved": "3.0.36",
"contentHash": "4ht/u677qxDL5rviXafYI5K9qtxee8peXQd8JDqG5KBRr5VXct2pgpEs5SnNu+gDO8dAVp+83N1IRiGG2Fl7Nw==",
"dependencies": {
"System.ComponentModel.Annotations": "4.4.0",
"System.Diagnostics.TraceSource": "4.3.0"
"System.Diagnostics.TraceSource": "4.3.0",
"System.Memory.Data": "1.0.1"
}
},
"Microsoft.Azure.WebJobs.Extensions": {
Expand Down Expand Up @@ -1720,13 +1721,13 @@
"microsoft.azure.webjobs.extensions.sql": {
"type": "Project",
"dependencies": {
"Microsoft.ApplicationInsights": "[2.17.0, )",
"Microsoft.AspNetCore.Http": "[2.2.2, )",
"Microsoft.Azure.WebJobs": "[3.0.32, )",
"Microsoft.Data.SqlClient": "[5.0.1, )",
"Newtonsoft.Json": "[13.0.1, )",
"System.Runtime.Caching": "[5.0.0, )",
"morelinq": "[3.3.2, )"
"Microsoft.ApplicationInsights": "2.17.0",
"Microsoft.AspNetCore.Http": "2.2.2",
"Microsoft.Azure.WebJobs": "3.0.36",
"Microsoft.Data.SqlClient": "5.0.1",
"Newtonsoft.Json": "13.0.1",
"System.Runtime.Caching": "5.0.0",
"morelinq": "3.3.2"
}
},
"Microsoft.ApplicationInsights": {
Expand Down Expand Up @@ -1763,11 +1764,11 @@
},
"Microsoft.Azure.WebJobs": {
"type": "CentralTransitive",
"requested": "[3.0.32, )",
"resolved": "3.0.32",
"contentHash": "uN8GsFqPFHHcSrwwj/+0tGe6F6cOwugqUiePPw7W3TL9YC594+Hw8GBK5S/fcDWXacqvRRGf9nDX8xP94/Yiyw==",
"requested": "[3.0.36, )",
"resolved": "3.0.36",
"contentHash": "S5wppmL8sUfanGmo/zDzpoWrcM+IOvGNFjpkD5XJFCFzrBqktZTEX6MpP/vF3RN6VZm2sFKegZYyce+RNhgE4Q==",
"dependencies": {
"Microsoft.Azure.WebJobs.Core": "3.0.32",
"Microsoft.Azure.WebJobs.Core": "3.0.36",
"Microsoft.Extensions.Configuration": "2.1.1",
"Microsoft.Extensions.Configuration.Abstractions": "2.1.1",
"Microsoft.Extensions.Configuration.EnvironmentVariables": "2.1.0",
Expand All @@ -1776,8 +1777,8 @@
"Microsoft.Extensions.Logging": "2.1.1",
"Microsoft.Extensions.Logging.Abstractions": "2.1.1",
"Microsoft.Extensions.Logging.Configuration": "2.1.0",
"Newtonsoft.Json": "11.0.2",
"System.Memory.Data": "1.0.1",
"Newtonsoft.Json": "13.0.1",
"System.Memory.Data": "1.0.2",
"System.Threading.Tasks.Dataflow": "4.8.0"
}
},
Expand Down
6 changes: 3 additions & 3 deletions samples/samples-outofproc/packages.lock.json
Original file line number Diff line number Diff line change
Expand Up @@ -830,9 +830,9 @@
"microsoft.azure.functions.worker.extension.sql": {
"type": "Project",
"dependencies": {
"Microsoft.Azure.Functions.Worker.Extensions.Abstractions": "[1.1.0, )",
"Microsoft.Data.SqlClient": "[5.0.1, )",
"System.Drawing.Common": "[5.0.3, )"
"Microsoft.Azure.Functions.Worker.Extensions.Abstractions": "1.1.0",
"Microsoft.Data.SqlClient": "5.0.1",
"System.Drawing.Common": "5.0.3"
}
},
"Microsoft.Azure.Functions.Worker.Extensions.Abstractions": {
Expand Down
95 changes: 0 additions & 95 deletions src/TriggerBinding/SqlTableChangeMonitor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -169,69 +169,6 @@ public void Dispose()
this._cancellationTokenSourceCheckForChanges.Cancel();
}

public async Task<long> GetUnprocessedChangeCountAsync()
{
long unprocessedChangeCount = 0L;

try
{
long getUnprocessedChangesDurationMs = 0L;

using (var connection = new SqlConnection(this._connectionString))
{
this._logger.LogDebugWithThreadId("BEGIN OpenGetUnprocessedChangesConnection");
await connection.OpenAsync();
this._logger.LogDebugWithThreadId("END OpenGetUnprocessedChangesConnection");

// Use a transaction to automatically release the app lock when we're done executing the query
using (SqlTransaction transaction = connection.BeginTransaction(IsolationLevel.RepeatableRead))
{
try
{
using (SqlCommand getUnprocessedChangesCommand = this.BuildGetUnprocessedChangesCommand(connection, transaction))
{
this._logger.LogInformation("Getting change count");
this._logger.LogDebugWithThreadId($"BEGIN GetUnprocessedChangeCount Query={getUnprocessedChangesCommand.CommandText}");
var commandSw = Stopwatch.StartNew();
unprocessedChangeCount = (long)await getUnprocessedChangesCommand.ExecuteScalarAsync();
getUnprocessedChangesDurationMs = commandSw.ElapsedMilliseconds;
}

this._logger.LogDebugWithThreadId($"END GetUnprocessedChangeCount Duration={getUnprocessedChangesDurationMs}ms Count={unprocessedChangeCount}");
transaction.Commit();
}
catch (Exception)
{
try
{
transaction.Rollback();
}
catch (Exception ex2)
{
this._logger.LogError($"GetUnprocessedChangeCount : Failed to rollback transaction due to exception: {ex2.GetType()}. Exception message: {ex2.Message}");
TelemetryInstance.TrackException(TelemetryErrorName.GetUnprocessedChangeCountRollback, ex2, this._telemetryProps);
}
throw;
}
}
}

var measures = new Dictionary<TelemetryMeasureName, double>
{
[TelemetryMeasureName.GetUnprocessedChangesDurationMs] = getUnprocessedChangesDurationMs,
[TelemetryMeasureName.UnprocessedChangeCount] = unprocessedChangeCount,
};
}
catch (Exception ex)
{
this._logger.LogError($"Failed to query count of unprocessed changes for table '{this._userTable.FullName}' due to exception: {ex.GetType()}. Exception message: {ex.Message}");
TelemetryInstance.TrackException(TelemetryErrorName.GetUnprocessedChangeCount, ex, this._telemetryProps);
throw;
}

return unprocessedChangeCount;
}

/// <summary>
/// Executed once every <see cref="_pollingIntervalInMs"/> period. If the state of the change monitor is
/// <see cref="State.CheckingForChanges"/>, then the method query the change/leases tables for changes on the
Expand Down Expand Up @@ -819,38 +756,6 @@ LEFT OUTER JOIN {this._userTable.BracketQuotedFullName} AS u ON {userTableJoinCo
return new SqlCommand(getChangesQuery, connection, transaction);
}

/// <summary>
/// Builds the query to get count of unprocessed changes in the user's table. This one mimics the query that is
/// used by workers to get the changes for processing.
/// </summary>
/// <param name="connection">The connection to add to the returned SqlCommand</param>
/// <param name="transaction">The transaction to add to the returned SqlCommand</param>
/// <returns>The SqlCommand populated with the query and appropriate parameters</returns>
private SqlCommand BuildGetUnprocessedChangesCommand(SqlConnection connection, SqlTransaction transaction)
{
string leasesTableJoinCondition = string.Join(" AND ", this._primaryKeyColumns.Select(col => $"c.{col.name.AsBracketQuotedString()} = l.{col.name.AsBracketQuotedString()}"));

string getUnprocessedChangesQuery = $@"
{AppLockStatements}

DECLARE @last_sync_version bigint;
SELECT @last_sync_version = LastSyncVersion
FROM {GlobalStateTableName}
WHERE UserFunctionID = '{this._userFunctionId}' AND UserTableID = {this._userTableId};

SELECT COUNT_BIG(*)
FROM CHANGETABLE(CHANGES {this._userTable.BracketQuotedFullName}, @last_sync_version) AS c
LEFT OUTER JOIN {this._leasesTableName} AS l ON {leasesTableJoinCondition}
WHERE
(l.{LeasesTableLeaseExpirationTimeColumnName} IS NULL AND
(l.{LeasesTableChangeVersionColumnName} IS NULL OR l.{LeasesTableChangeVersionColumnName} < c.{SysChangeVersionColumnName}) OR
l.{LeasesTableLeaseExpirationTimeColumnName} < SYSDATETIME()) AND
(l.{LeasesTableAttemptCountColumnName} IS NULL OR l.{LeasesTableAttemptCountColumnName} < {MaxChangeProcessAttemptCount});
";

return new SqlCommand(getUnprocessedChangesQuery, connection, transaction);
}

/// <summary>
/// Builds the query to acquire leases on the rows in "_rows" if changes are detected in the user's table
/// (<see cref="RunChangeConsumptionLoopAsync()"/>).
Expand Down
Loading