Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion Worker.Extensions.Sql/src/SqlTriggerAttribute.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,21 @@ public sealed class SqlTriggerAttribute : TriggerBindingAttribute
/// </summary>
/// <param name="tableName">Name of the table to watch for changes.</param>
/// <param name="connectionStringSetting">The name of the app setting where the SQL connection string is stored</param>
public SqlTriggerAttribute(string tableName, string connectionStringSetting)
/// <param name="leasesTableName">Optional - The name of the table used to store leases. If not specified, the leases table name will be Leases_{FunctionId}_{TableId}</param>
public SqlTriggerAttribute(string tableName, string connectionStringSetting, string leasesTableName = null)
{
this.TableName = tableName ?? throw new ArgumentNullException(nameof(tableName));
this.ConnectionStringSetting = connectionStringSetting ?? throw new ArgumentNullException(nameof(connectionStringSetting));
this.LeasesTableName = leasesTableName;
}

/// <summary>
/// Initializes a new instance of the <see cref="SqlTriggerAttribute"/> class with null value for LeasesTableName.
/// </summary>
/// <param name="tableName">Name of the table to watch for changes.</param>
/// <param name="connectionStringSetting">The name of the app setting where the SQL connection string is stored</param>
public SqlTriggerAttribute(string tableName, string connectionStringSetting) : this(tableName, connectionStringSetting, null) { }

/// <summary>
/// Name of the app setting containing the SQL connection string.
/// </summary>
Expand All @@ -28,5 +37,13 @@ public SqlTriggerAttribute(string tableName, string connectionStringSetting)
/// Name of the table to watch for changes.
/// </summary>
public string TableName { get; }

/// <summary>
/// Name of the table used to store leases.
/// If not specified, the leases table name will be Leases_{FunctionId}_{TableId}
/// More information on how this is generated can be found here
/// https://github.com/Azure/azure-functions-sql-extension/blob/release/trigger/docs/TriggerBinding.md#az_funcleases_
/// </summary>
public string LeasesTableName { get; }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System.Collections.Generic;
using Microsoft.Azure.WebJobs.Extensions.Sql.Samples.Common;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;

namespace Microsoft.Azure.WebJobs.Extensions.Sql.Samples.TriggerBindingSamples
{
public static class ProductsTriggerLeasesTableName
{
[FunctionName(nameof(ProductsTriggerLeasesTableName))]
public static void Run(
[SqlTrigger("[dbo].[Products]", "SqlConnectionString", "Leases")]
IReadOnlyList<SqlChange<Product>> changes,
ILogger logger)
{
logger.LogInformation("SQL Changes: " + JsonConvert.SerializeObject(changes));
}
}
}
13 changes: 13 additions & 0 deletions samples/samples-csx/ProductsTriggerLeasesTableName/function.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
{
"bindings": [
{
"name": "changes",
"type": "sqlTrigger",
"direction": "in",
"tableName": "dbo.Products",
"connectionStringSetting": "SqlConnectionString",
"leasesTableName": "Leases"
}
],
"disabled": false
}
14 changes: 14 additions & 0 deletions samples/samples-csx/ProductsTriggerLeasesTableName/run.csx
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#load "../Common/product.csx"
#r "Newtonsoft.Json"
#r "Microsoft.Azure.WebJobs.Extensions.Sql"

using System.Net;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Primitives;
using Newtonsoft.Json;
using Microsoft.Azure.WebJobs.Extensions.Sql;

public static void Run(IReadOnlyList<SqlChange<Product>> changes, ILogger log)
{
log.LogInformation("SQL Changes: " + JsonConvert.SerializeObject(changes));
}
13 changes: 13 additions & 0 deletions samples/samples-js/ProductsTriggerLeasesTableName/function.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
{
"bindings": [
{
"name": "changes",
"type": "sqlTrigger",
"direction": "in",
"tableName": "dbo.Products",
"connectionStringSetting": "SqlConnectionString",
"leasesTableName": "Leases"
}
],
"disabled": false
}
6 changes: 6 additions & 0 deletions samples/samples-js/ProductsTriggerLeasesTableName/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

module.exports = async function (context, changes) {
context.log(`SQL Changes: ${JSON.stringify(changes)}`)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System.Collections.Generic;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Azure.Functions.Worker.Extensions.Sql;
using Microsoft.Azure.WebJobs.Extensions.Sql.SamplesOutOfProc.Common;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using System;

namespace Microsoft.Azure.WebJobs.Extensions.Sql.SamplesOutOfProc.TriggerBindingSamples
{
public class ProductsTriggerLeasesTableName
{
private static readonly Action<ILogger, string, Exception> _loggerMessage = LoggerMessage.Define<string>(LogLevel.Information, eventId: new EventId(0, "INFO"), formatString: "{Message}");

[Function("ProductsTriggerLeasesTableName")]
public static void Run(
[SqlTrigger("[dbo].[Products]", "SqlConnectionString", "Leases")]
IReadOnlyList<SqlChange<Product>> changes, FunctionContext context)
{
if (changes != null && changes.Count > 0)
{
_loggerMessage(context.GetLogger("ProductsTriggerLeasesTableName"), "SQL Changes: " + JsonConvert.SerializeObject(changes), null);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
{
"bindings": [
{
"name": "changes",
"type": "sqlTrigger",
"direction": "in",
"tableName": "dbo.Products",
"connectionStringSetting": "SqlConnectionString",
"leasesTableName": "Leases"
}
],
"disabled": false
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.

using namespace System.Net

param($changes)

$changesJson = $changes | ConvertTo-Json -Compress
Write-Host "SQL Changes: $changesJson"
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.

import json
import logging

def main(changes):
logging.info("SQL Changes: %s", json.loads(changes))
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
{
"bindings": [
{
"name": "changes",
"type": "sqlTrigger",
"direction": "in",
"tableName": "dbo.Products",
"connectionStringSetting": "SqlConnectionString",
"leasesTableName": "Leases"
}
],
"disabled": false
}
19 changes: 18 additions & 1 deletion src/TriggerBinding/SqlTriggerAttribute.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,21 @@ public sealed class SqlTriggerAttribute : Attribute
/// </summary>
/// <param name="tableName">Name of the table to watch for changes.</param>
/// <param name="connectionStringSetting">The name of the app setting where the SQL connection string is stored</param>
public SqlTriggerAttribute(string tableName, string connectionStringSetting)
/// <param name="leasesTableName">Optional - The name of the table used to store leases. If not specified, the leases table name will be Leases_{FunctionId}_{TableId}</param>
public SqlTriggerAttribute(string tableName, string connectionStringSetting, string leasesTableName = null)
{
this.TableName = tableName ?? throw new ArgumentNullException(nameof(tableName));
this.ConnectionStringSetting = connectionStringSetting ?? throw new ArgumentNullException(nameof(connectionStringSetting));
this.LeasesTableName = leasesTableName;
}

/// <summary>
/// Initializes a new instance of the <see cref="SqlTriggerAttribute"/> class with null value for LeasesTableName.
/// </summary>
/// <param name="tableName">Name of the table to watch for changes.</param>
/// <param name="connectionStringSetting">The name of the app setting where the SQL connection string is stored</param>
public SqlTriggerAttribute(string tableName, string connectionStringSetting) : this(tableName, connectionStringSetting, null) { }

/// <summary>
/// Name of the app setting containing the SQL connection string.
/// </summary>
Expand All @@ -34,5 +43,13 @@ public SqlTriggerAttribute(string tableName, string connectionStringSetting)
/// Name of the table to watch for changes.
/// </summary>
public string TableName { get; }

/// <summary>
/// Name of the table used to store leases.
/// If not specified, the leases table name will be Leases_{FunctionId}_{TableId}
/// More information on how this is generated can be found here
/// https://github.com/Azure/azure-functions-sql-extension/blob/release/trigger/docs/TriggerBinding.md#az_funcleases_
/// </summary>
public string LeasesTableName { get; }
}
}
7 changes: 5 additions & 2 deletions src/TriggerBinding/SqlTriggerBinding.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ internal sealed class SqlTriggerBinding<T> : ITriggerBinding
{
private readonly string _connectionString;
private readonly string _tableName;
private readonly string _leasesTableName;
private readonly ParameterInfo _parameter;
private readonly IHostIdProvider _hostIdProvider;
private readonly ILogger _logger;
Expand All @@ -40,14 +41,16 @@ internal sealed class SqlTriggerBinding<T> : ITriggerBinding
/// </summary>
/// <param name="connectionString">SQL connection string used to connect to user database</param>
/// <param name="tableName">Name of the user table</param>
/// <param name="leasesTableName">Optional - Name of the leases table</param>
/// <param name="parameter">Trigger binding parameter information</param>
/// <param name="hostIdProvider">Provider of unique host identifier</param>
/// <param name="logger">Facilitates logging of messages</param>
/// <param name="configuration">Provides configuration values</param>
public SqlTriggerBinding(string connectionString, string tableName, ParameterInfo parameter, IHostIdProvider hostIdProvider, ILogger logger, IConfiguration configuration)
public SqlTriggerBinding(string connectionString, string tableName, string leasesTableName, ParameterInfo parameter, IHostIdProvider hostIdProvider, ILogger logger, IConfiguration configuration)
{
this._connectionString = connectionString ?? throw new ArgumentNullException(nameof(connectionString));
this._tableName = tableName ?? throw new ArgumentNullException(nameof(tableName));
this._leasesTableName = leasesTableName;
this._parameter = parameter ?? throw new ArgumentNullException(nameof(parameter));
this._hostIdProvider = hostIdProvider ?? throw new ArgumentNullException(nameof(hostIdProvider));
this._logger = logger ?? throw new ArgumentNullException(nameof(logger));
Expand All @@ -72,7 +75,7 @@ public async Task<IListener> CreateListenerAsync(ListenerFactoryContext context)
_ = context ?? throw new ArgumentNullException(nameof(context), "Missing listener context");

string userFunctionId = await this.GetUserFunctionIdAsync();
return new SqlTriggerListener<T>(this._connectionString, this._tableName, userFunctionId, context.Executor, this._logger, this._configuration);
return new SqlTriggerListener<T>(this._connectionString, this._tableName, this._leasesTableName, userFunctionId, context.Executor, this._logger, this._configuration);
}

public ParameterDescriptor ToParameterDescriptor()
Expand Down
4 changes: 2 additions & 2 deletions src/TriggerBinding/SqlTriggerBindingProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,10 @@ public Task<ITriggerBinding> TryCreateAsync(TriggerBindingProviderContext contex
bindingType = typeof(SqlTriggerBinding<>).MakeGenericType(userType);
}

var constructorParameterTypes = new Type[] { typeof(string), typeof(string), typeof(ParameterInfo), typeof(IHostIdProvider), typeof(ILogger), typeof(IConfiguration) };
var constructorParameterTypes = new Type[] { typeof(string), typeof(string), typeof(string), typeof(ParameterInfo), typeof(IHostIdProvider), typeof(ILogger), typeof(IConfiguration) };
ConstructorInfo bindingConstructor = bindingType.GetConstructor(constructorParameterTypes);

object[] constructorParameterValues = new object[] { connectionString, attribute.TableName, parameter, this._hostIdProvider, this._logger, this._configuration };
object[] constructorParameterValues = new object[] { connectionString, attribute.TableName, attribute.LeasesTableName, parameter, this._hostIdProvider, this._logger, this._configuration };
var triggerBinding = (ITriggerBinding)bindingConstructor.Invoke(constructorParameterValues);

return Task.FromResult(triggerBinding);
Expand Down
2 changes: 2 additions & 0 deletions src/TriggerBinding/SqlTriggerConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ internal static class SqlTriggerConstants

public const string LeasesTableNameFormat = "[" + SchemaName + "].[Leases_{0}]";

public const string UserDefinedLeasesTableNameFormat = "[" + SchemaName + "].{0}";

public const string LeasesTableChangeVersionColumnName = "_az_func_ChangeVersion";
public const string LeasesTableAttemptCountColumnName = "_az_func_AttemptCount";
public const string LeasesTableLeaseExpirationTimeColumnName = "_az_func_LeaseExpirationTime";
Expand Down
12 changes: 7 additions & 5 deletions src/TriggerBinding/SqlTriggerListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Globalization;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -40,6 +39,7 @@ internal sealed class SqlTriggerListener<T> : IListener, IScaleMonitorProvider,

private readonly SqlObject _userTable;
private readonly string _connectionString;
private readonly string _userDefinedLeasesTableName;
private readonly string _userFunctionId;
private readonly ITriggeredFunctionExecutor _executor;
private readonly ILogger _logger;
Expand All @@ -61,14 +61,16 @@ internal sealed class SqlTriggerListener<T> : IListener, IScaleMonitorProvider,
/// </summary>
/// <param name="connectionString">SQL connection string used to connect to user database</param>
/// <param name="tableName">Name of the user table</param>
/// <param name="userDefinedLeasesTableName">Optional - Name of the leases table</param>
/// <param name="userFunctionId">Unique identifier for the user function</param>
/// <param name="executor">Defines contract for triggering user function</param>
/// <param name="logger">Facilitates logging of messages</param>
/// <param name="configuration">Provides configuration values</param>
public SqlTriggerListener(string connectionString, string tableName, string userFunctionId, ITriggeredFunctionExecutor executor, ILogger logger, IConfiguration configuration)
public SqlTriggerListener(string connectionString, string tableName, string userDefinedLeasesTableName, string userFunctionId, ITriggeredFunctionExecutor executor, ILogger logger, IConfiguration configuration)
{
this._connectionString = !string.IsNullOrEmpty(connectionString) ? connectionString : throw new ArgumentNullException(nameof(connectionString));
this._userTable = !string.IsNullOrEmpty(tableName) ? new SqlObject(tableName) : throw new ArgumentNullException(nameof(tableName));
this._userDefinedLeasesTableName = userDefinedLeasesTableName;
this._userFunctionId = !string.IsNullOrEmpty(userFunctionId) ? userFunctionId : throw new ArgumentNullException(nameof(userFunctionId));
this._executor = executor ?? throw new ArgumentNullException(nameof(executor));
this._logger = logger ?? throw new ArgumentNullException(nameof(logger));
Expand All @@ -82,8 +84,8 @@ public SqlTriggerListener(string connectionString, string tableName, string user
}
this._hasConfiguredMaxChangesPerWorker = configuredMaxChangesPerWorker != null;

this._scaleMonitor = new SqlTriggerScaleMonitor(this._userFunctionId, this._userTable, this._connectionString, this._maxChangesPerWorker, this._logger);
this._targetScaler = new SqlTriggerTargetScaler(this._userFunctionId, this._userTable, this._connectionString, this._maxChangesPerWorker, this._logger);
this._scaleMonitor = new SqlTriggerScaleMonitor(this._userFunctionId, this._userTable, this._userDefinedLeasesTableName, this._connectionString, this._maxChangesPerWorker, this._logger);
this._targetScaler = new SqlTriggerTargetScaler(this._userFunctionId, this._userTable, this._userDefinedLeasesTableName, this._connectionString, this._maxChangesPerWorker, this._logger);
}

public void Cancel()
Expand Down Expand Up @@ -123,7 +125,7 @@ public async Task StartAsync(CancellationToken cancellationToken)
IReadOnlyList<(string name, string type)> primaryKeyColumns = await GetPrimaryKeyColumnsAsync(connection, userTableId, this._logger, this._userTable.FullName, cancellationToken);
IReadOnlyList<string> userTableColumns = await this.GetUserTableColumnsAsync(connection, userTableId, cancellationToken);

string leasesTableName = string.Format(CultureInfo.InvariantCulture, LeasesTableNameFormat, $"{this._userFunctionId}_{userTableId}");
string leasesTableName = GetLeasesTableName(this._userDefinedLeasesTableName, this._userFunctionId, userTableId);
this._telemetryProps[TelemetryPropertyName.LeasesTableName] = leasesTableName;

var transactionSw = Stopwatch.StartNew();
Expand Down
Loading