Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[LogicApps Hybrid] Moving all queries to get unprocessed rows to a StoredProcedure #1159

Closed
ishank12 opened this issue Jan 6, 2025 · 2 comments

Comments

@ishank12
Copy link

ishank12 commented Jan 6, 2025

Is your feature request related to a problem? Please describe.
This feature request is related to support of SQL trigger scaling when functions host is running in kubernetes and depends on KEDA scalers for scaling.

This is how it works currently.
Image

But in ACA when sql trigger scaling is required it needs KEDA mssql scaler. our initial approach was below.
Image

This works fine for connection string, as the dataplane is using the user's connection string.
But this would not work for managed identity based connection as dataplane is not having the identity of the app.

Dataplane access to SQL server is needed for creating the stored procedure. But if sql extension itself would create the stored procedure, then dataplane would not need to access the sql server.

This would be more cleaner.

Describe the solution or feature you'd like
Moving the queries to get unprocessed rows to a StoredProcedure, So that sql extension creates the stored procedure and this stored procedure can be executed by Keda Scaler as well.

Describe alternatives you've considered
A clear and concise description of any alternative solutions or features you've considered.

Additional context
Add any other context or screenshots about the feature request here.

@JetterMcTedder
Copy link

I think you may be better served by using a stored procedure that you create and interact with change tracking directly.

@JetterMcTedder JetterMcTedder removed this from the Backlog milestone Jan 9, 2025
@ishank12
Copy link
Author

The problem with dataplane creating the stored procedure is that it cannot access sql server with identities that user has given to the app. So it would be better if sql extension itself creates the stored procedure at the time of initialization and keda will take care of executing the stored proc on regular intervals.
here is how it would look.

Image

The stored procedure that we checked and works fine is below.

procScript := fmt.Sprintf(`
CREATE OR ALTER PROCEDURE %s
AS
BEGIN
  DECLARE @UserFunctionID NVARCHAR(255) = '%s';
  DECLARE @CustomerTableName NVARCHAR(255) = '%s';
  DECLARE @UserDefinedLeaseTableName NVARCHAR(255) = '%s';
  DECLARE @UserTableID NVARCHAR(255) = OBJECT_ID(QUOTENAME(@CustomerTableName), 'U');
  DECLARE @LeaseTableName NVARCHAR(255) = N'[az_func].[Leases_' + @UserFunctionID + '_' + @UserTableID + N']';
  DECLARE @last_sync_version bigint;
  DECLARE @JoinCondition NVARCHAR(MAX) = '';
  DECLARE @SQL NVARCHAR(MAX);

  -- Set LeaseTableName based on UserDefinedLeaseTableName
  IF @UserDefinedLeaseTableName <> ''
  BEGIN
    SET @LeaseTableName = N'[az_func].' + QUOTENAME(@UserDefinedLeaseTableName);
  END

  -- Get the LastSyncVersion
  SELECT @last_sync_version = LastSyncVersion
  FROM [az_func].[GlobalState]
  WHERE UserFunctionID = @UserFunctionID AND UserTableID = @UserTableID;

  -- Construct the join condition dynamically for primary key columns
  SELECT @JoinCondition = STRING_AGG('c.' + QUOTENAME(c.name) + ' = l.' + QUOTENAME(c.name), ' AND ')
  FROM sys.indexes AS i
  INNER JOIN sys.index_columns AS ic ON i.object_id = ic.object_id AND i.index_id = ic.index_id
  INNER JOIN sys.columns AS c ON ic.object_id = c.object_id AND ic.column_id = c.column_id
  WHERE i.is_primary_key = 1 AND i.object_id = @UserTableID;

  -- Create the dynamic SQL query
  SET @SQL = N'SELECT COUNT_BIG(*) ' +
      N'FROM CHANGETABLE(CHANGES ' + QUOTENAME(@CustomerTableName) + N', @last_sync_version) AS c ' +
      N'LEFT OUTER JOIN ' + @LeaseTableName + N' AS l ON ' + @JoinCondition + N' ' +
      N'WHERE (l._az_func_LeaseExpirationTime IS NULL ' +
      N'AND (l._az_func_ChangeVersion IS NULL OR l._az_func_ChangeVersion < c.SYS_CHANGE_VERSION) ' +
      N'OR l._az_func_LeaseExpirationTime < SYSDATETIME()) ' +
      N'AND (l._az_func_AttemptCount IS NULL OR l._az_func_AttemptCount < 5);';

  -- Execute the dynamic SQL with @last_sync_version as a parameter
  EXEC sp_executesql @SQL, N'@last_sync_version bigint', @last_sync_version;
END;
`, storedProcedureName, userFunctionID, customerTableName, leaseTableName)

@JetterMcTedder JetterMcTedder closed this as not planned Won't fix, can't repro, duplicate, stale Jan 17, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants