diff --git a/samples/samples-csharp/host.json b/samples/samples-csharp/host.json index dff22557b..ae18fbad0 100644 --- a/samples/samples-csharp/host.json +++ b/samples/samples-csharp/host.json @@ -8,7 +8,7 @@ } }, "logLevel": { - "default": "Information" + "default": "Debug" } } } \ No newline at end of file diff --git a/src/SqlBindingConfigProvider.cs b/src/SqlBindingConfigProvider.cs index f19efadff..85b5f9498 100644 --- a/src/SqlBindingConfigProvider.cs +++ b/src/SqlBindingConfigProvider.cs @@ -87,7 +87,7 @@ private static void LogDependentAssemblyVersions(ILogger logger) { try { - logger.LogInformation($"Using {assembly.GetName().Name} {FileVersionInfo.GetVersionInfo(assembly.Location).ProductVersion}"); + logger.LogDebug($"Using {assembly.GetName().Name} {FileVersionInfo.GetVersionInfo(assembly.Location).ProductVersion}"); } catch (Exception ex) { diff --git a/src/SqlBindingUtilities.cs b/src/SqlBindingUtilities.cs index 5468c8716..1882666ec 100644 --- a/src/SqlBindingUtilities.cs +++ b/src/SqlBindingUtilities.cs @@ -372,6 +372,26 @@ public static async Task GetServerTelemetryProperties(SqlConne return null; } + /// + /// Calls ExecuteScalarAsync and logs an error if it fails before rethrowing. + /// + /// The SqlCommand being executed + /// The logger + /// The cancellation token to pass to the call + /// The result of the call + public static async Task ExecuteScalarAsyncWithLogging(this SqlCommand cmd, ILogger logger, CancellationToken cancellationToken) + { + try + { + return await cmd.ExecuteScalarAsync(cancellationToken); + } + catch (Exception e) + { + logger.LogError($"Exception executing query. Message={e.Message}\nQuery={cmd.CommandText}"); + throw; + } + } + /// /// Calls ExecuteNonQueryAsync and logs an error if it fails before rethrowing. /// diff --git a/src/SqlConverters.cs b/src/SqlConverters.cs index 809859f29..99ad312c8 100644 --- a/src/SqlConverters.cs +++ b/src/SqlConverters.cs @@ -170,7 +170,6 @@ public virtual async Task BuildItemFromAttributeAsync(SqlAttribute attri TelemetryInstance.TrackConvert(type, props); var dataTable = new DataTable(); adapter.Fill(dataTable); - this._logger.LogInformation($"{dataTable.Rows.Count} row(s) queried from database: {connection.Database} using Command: {command.CommandText}"); // Serialize any DateTime objects in UTC format var jsonSerializerSettings = new JsonSerializerSettings() { diff --git a/src/Telemetry/Telemetry.cs b/src/Telemetry/Telemetry.cs index 84b0f16af..ff7b666d4 100644 --- a/src/Telemetry/Telemetry.cs +++ b/src/Telemetry/Telemetry.cs @@ -52,7 +52,6 @@ public void Initialize(IConfiguration config, ILogger logger) this.Enabled = !(Utils.GetEnvironmentVariableAsBool(TelemetryOptoutEnvVar) || Utils.GetConfigSettingAsBool(TelemetryOptoutSetting, config)); if (!this.Enabled) { - this._logger.LogInformation("Telemetry disabled"); return; } this._logger.LogInformation(WelcomeMessage); diff --git a/src/TriggerBinding/SqlTableChangeMonitor.cs b/src/TriggerBinding/SqlTableChangeMonitor.cs index 6738b5dae..0af9b36cc 100644 --- a/src/TriggerBinding/SqlTableChangeMonitor.cs +++ b/src/TriggerBinding/SqlTableChangeMonitor.cs @@ -189,7 +189,7 @@ public void Dispose() /// private async Task RunChangeConsumptionLoopAsync() { - this._logger.LogInformation($"Starting change consumption loop. MaxBatchSize: {this._maxBatchSize} PollingIntervalMs: {this._pollingIntervalInMs}"); + this._logger.LogDebug($"Starting change consumption loop. MaxBatchSize: {this._maxBatchSize} PollingIntervalMs: {this._pollingIntervalInMs}"); try { @@ -197,9 +197,7 @@ private async Task RunChangeConsumptionLoopAsync() using (var connection = new SqlConnection(this._connectionString)) { - this._logger.LogDebug("BEGIN OpenChangeConsumptionConnection"); await connection.OpenAsync(token); - this._logger.LogDebug("END OpenChangeConsumptionConnection"); bool forceReconnect = false; // Check for cancellation request only after a cycle of checking and processing of changes completes. @@ -216,7 +214,6 @@ private async Task RunChangeConsumptionLoopAsync() { forceReconnect = false; } - this._logger.LogDebug($"BEGIN ProcessingChanges State={this._state}"); try { @@ -243,8 +240,6 @@ private async Task RunChangeConsumptionLoopAsync() this._logger.LogError($"Fatal SQL Client exception processing changes. Will attempt to reestablish connection in {this._pollingIntervalInMs}ms. Exception = {e.Message}"); forceReconnect = true; } - this._logger.LogDebug("END ProcessingChanges"); - this._logger.LogDebug($"Delaying for {this._pollingIntervalInMs}ms"); await Task.Delay(TimeSpan.FromMilliseconds(this._pollingIntervalInMs), token); } } @@ -276,7 +271,6 @@ private async Task RunChangeConsumptionLoopAsync() /// private async Task GetTableChangesAsync(SqlConnection connection, CancellationToken token) { - this._logger.LogDebug("BEGIN GetTableChanges"); try { var transactionSw = Stopwatch.StartNew(); @@ -289,19 +283,16 @@ private async Task GetTableChangesAsync(SqlConnection connection, CancellationTo // Update the version number stored in the global state table if necessary before using it. using (SqlCommand updateTablesPreInvocationCommand = this.BuildUpdateTablesPreInvocation(connection, transaction)) { - this._logger.LogDebug($"BEGIN UpdateTablesPreInvocation Query={updateTablesPreInvocationCommand.CommandText}"); var commandSw = Stopwatch.StartNew(); - await updateTablesPreInvocationCommand.ExecuteNonQueryAsync(token); + await updateTablesPreInvocationCommand.ExecuteNonQueryAsyncWithLogging(this._logger, token); setLastSyncVersionDurationMs = commandSw.ElapsedMilliseconds; } - this._logger.LogDebug($"END UpdateTablesPreInvocation Duration={setLastSyncVersionDurationMs}ms"); var rows = new List>(); // Use the version number to query for new changes. using (SqlCommand getChangesCommand = this.BuildGetChangesCommand(connection, transaction)) { - this._logger.LogDebug($"BEGIN GetChanges Query={getChangesCommand.CommandText}"); var commandSw = Stopwatch.StartNew(); using (SqlDataReader reader = await getChangesCommand.ExecuteReaderAsync(token)) @@ -314,19 +305,16 @@ private async Task GetTableChangesAsync(SqlConnection connection, CancellationTo getChangesDurationMs = commandSw.ElapsedMilliseconds; } - this._logger.LogDebug($"END GetChanges Duration={getChangesDurationMs}ms ChangedRows={rows.Count}"); // If changes were found, acquire leases on them. if (rows.Count > 0) { using (SqlCommand acquireLeasesCommand = this.BuildAcquireLeasesCommand(connection, transaction, rows)) { - this._logger.LogDebug($"BEGIN AcquireLeases Query={acquireLeasesCommand.CommandText}"); var commandSw = Stopwatch.StartNew(); - await acquireLeasesCommand.ExecuteNonQueryAsync(token); + await acquireLeasesCommand.ExecuteNonQueryAsyncWithLogging(this._logger, token); acquireLeasesDurationMs = commandSw.ElapsedMilliseconds; } - this._logger.LogDebug($"END AcquireLeases Duration={acquireLeasesDurationMs}ms"); // Only send event if we got changes to reduce the overall number of events sent since we generally // only care about the times that we had to actually retrieve and process rows @@ -375,12 +363,10 @@ private async Task GetTableChangesAsync(SqlConnection connection, CancellationTo throw; } } - this._logger.LogDebug("END GetTableChanges"); } private async Task ProcessTableChangesAsync() { - this._logger.LogDebug("BEGIN ProcessTableChanges"); if (this._rowsToProcess.Count > 0) { IReadOnlyList> changes = null; @@ -407,7 +393,6 @@ private async Task ProcessTableChangesAsync() { var input = new TriggeredFunctionData() { TriggerValue = changes }; - this._logger.LogDebug("Executing triggered function"); var stopwatch = Stopwatch.StartNew(); FunctionResult result = await this._executor.TryExecuteAsync(input, this._cancellationTokenSourceExecutor.Token); @@ -419,7 +404,6 @@ private async Task ProcessTableChangesAsync() }; if (result.Succeeded) { - this._logger.LogDebug($"Successfully triggered function. Duration={durationMs}ms"); TelemetryInstance.TrackEvent(TelemetryEventName.TriggerFunction, this._telemetryProps, measures); // We've successfully fully processed these so set them to be released in the cleanup phase this._rowsToRelease = this._rowsToProcess; @@ -429,7 +413,6 @@ private async Task ProcessTableChangesAsync() { // In the future might make sense to retry executing the function, but for now we just let // another worker try. - this._logger.LogError($"Failed to trigger user function for table: '{this._userTable.FullName} due to exception: {result.Exception.GetType()}. Exception message: {result.Exception.Message}"); TelemetryInstance.TrackException(TelemetryErrorName.TriggerFunction, result.Exception, this._telemetryProps, measures); } this._state = State.Cleanup; @@ -441,7 +424,6 @@ private async Task ProcessTableChangesAsync() // any we still ensure everything is reset to a clean state await this.ClearRowsAsync(); } - this._logger.LogDebug("END ProcessTableChanges"); } /// @@ -450,7 +432,7 @@ private async Task ProcessTableChangesAsync() /// private async void RunLeaseRenewalLoopAsync() { - this._logger.LogInformation("Starting lease renewal loop."); + this._logger.LogDebug("Starting lease renewal loop."); try { @@ -458,9 +440,7 @@ private async void RunLeaseRenewalLoopAsync() using (var connection = new SqlConnection(this._connectionString)) { - this._logger.LogDebug("BEGIN OpenLeaseRenewalLoopConnection"); await connection.OpenAsync(token); - this._logger.LogDebug("END OpenLeaseRenewalLoopConnection"); bool forceReconnect = false; while (!token.IsCancellationRequested) @@ -509,9 +489,7 @@ private async void RunLeaseRenewalLoopAsync() private async Task RenewLeasesAsync(SqlConnection connection, CancellationToken token) { - this._logger.LogDebug("BEGIN WaitRowsLock - RenewLeases"); await this._rowsLock.WaitAsync(token); - this._logger.LogDebug("END WaitRowsLock - RenewLeases"); if (this._state == State.ProcessingChanges && this._rowsToProcess.Count > 0) { @@ -522,13 +500,11 @@ private async Task RenewLeasesAsync(SqlConnection connection, CancellationToken { using (SqlCommand renewLeasesCommand = this.BuildRenewLeasesCommand(connection, transaction)) { - this._logger.LogDebug($"BEGIN RenewLeases Query={renewLeasesCommand.CommandText}"); var stopwatch = Stopwatch.StartNew(); - int rowsAffected = await renewLeasesCommand.ExecuteNonQueryAsync(token); + int rowsAffected = await renewLeasesCommand.ExecuteNonQueryAsyncWithLogging(this._logger, token); long durationMs = stopwatch.ElapsedMilliseconds; - this._logger.LogDebug($"END RenewLeases Duration={durationMs}ms RowsAffected={rowsAffected}"); if (rowsAffected > 0) { @@ -588,7 +564,6 @@ private async Task RenewLeasesAsync(SqlConnection connection, CancellationToken } // Want to always release the lock at the end, even if renewing the leases failed. - this._logger.LogDebug("ReleaseRowsLock - RenewLeases"); this._rowsLock.Release(); } @@ -597,15 +572,12 @@ private async Task RenewLeasesAsync(SqlConnection connection, CancellationToken /// private async Task ClearRowsAsync() { - this._logger.LogDebug("BEGIN WaitRowsLock - ClearRows"); await this._rowsLock.WaitAsync(); - this._logger.LogDebug("END WaitRowsLock - ClearRows"); this._leaseRenewalCount = 0; this._state = State.CheckingForChanges; this._rowsToProcess = new List>(); - this._logger.LogDebug("ReleaseRowsLock - ClearRows"); this._rowsLock.Release(); } @@ -632,22 +604,18 @@ private async Task ReleaseLeasesAsync(SqlConnection connection, CancellationToke // Release the leases held on "_rowsToRelease". using (SqlCommand releaseLeasesCommand = this.BuildReleaseLeasesCommand(connection, transaction)) { - this._logger.LogDebug($"BEGIN ReleaseLeases Query={releaseLeasesCommand.CommandText}"); var commandSw = Stopwatch.StartNew(); - int rowsUpdated = await releaseLeasesCommand.ExecuteNonQueryAsync(token); + int rowsUpdated = await releaseLeasesCommand.ExecuteNonQueryAsyncWithLogging(this._logger, token); releaseLeasesDurationMs = commandSw.ElapsedMilliseconds; - this._logger.LogDebug($"END ReleaseLeases Duration={releaseLeasesDurationMs}ms RowsUpdated={rowsUpdated}"); } // Update the global state table if we have processed all changes with ChangeVersion <= newLastSyncVersion, // and clean up the leases table to remove all rows with ChangeVersion <= newLastSyncVersion. using (SqlCommand updateTablesPostInvocationCommand = this.BuildUpdateTablesPostInvocation(connection, transaction, newLastSyncVersion)) { - this._logger.LogDebug($"BEGIN UpdateTablesPostInvocation Query={updateTablesPostInvocationCommand.CommandText}"); var commandSw = Stopwatch.StartNew(); - await updateTablesPostInvocationCommand.ExecuteNonQueryAsync(token); + await updateTablesPostInvocationCommand.ExecuteNonQueryAsyncWithLogging(this._logger, token); updateLastSyncVersionDurationMs = commandSw.ElapsedMilliseconds; - this._logger.LogDebug($"END UpdateTablesPostInvocation Duration={updateLastSyncVersionDurationMs}ms"); } transaction.Commit(); @@ -717,9 +685,7 @@ private long RecomputeLastSyncVersion() // the only version number in the set. // Also this LastSyncVersion is actually updated in the GlobalState table only after verifying that the changes with // changeVersion <= newLastSyncVersion have been processed in BuildUpdateTablesPostInvocation query. - long lastSyncVersion = changeVersionSet.ElementAt(changeVersionSet.Count > 1 ? changeVersionSet.Count - 2 : 0); - this._logger.LogDebug($"RecomputeLastSyncVersion. LastSyncVersion={lastSyncVersion} ChangeVersionSet={string.Join(",", changeVersionSet)}"); - return lastSyncVersion; + return changeVersionSet.ElementAt(changeVersionSet.Count > 1 ? changeVersionSet.Count - 2 : 0); } /// @@ -730,7 +696,6 @@ private long RecomputeLastSyncVersion() /// The list of changes private IReadOnlyList> ProcessChanges() { - this._logger.LogDebug("BEGIN ProcessChanges"); var changes = new List>(); foreach (IReadOnlyDictionary row in this._rowsToProcess) { @@ -744,7 +709,6 @@ private IReadOnlyList> ProcessChanges() changes.Add(new SqlChange(operation, Utils.JsonDeserializeObject(Utils.JsonSerializeObject(item)))); } - this._logger.LogDebug("END ProcessChanges"); return changes; } diff --git a/src/TriggerBinding/SqlTriggerListener.cs b/src/TriggerBinding/SqlTriggerListener.cs index 685c396e6..411dd766a 100644 --- a/src/TriggerBinding/SqlTriggerListener.cs +++ b/src/TriggerBinding/SqlTriggerListener.cs @@ -113,9 +113,7 @@ public async Task StartAsync(CancellationToken cancellationToken) { using (var connection = new SqlConnection(this._connectionString)) { - this._logger.LogDebug("BEGIN OpenListenerConnection"); await connection.OpenAsyncWithSqlErrorHandling(cancellationToken); - this._logger.LogDebug("END OpenListenerConnection"); ServerProperties serverProperties = await GetServerTelemetryProperties(connection, this._logger, cancellationToken); this._telemetryProps.AddConnectionProps(connection, serverProperties); @@ -140,8 +138,6 @@ public async Task StartAsync(CancellationToken cancellationToken) transaction.Commit(); } - this._logger.LogInformation($"Starting SQL trigger listener for table: '{this._userTable.FullName}' (object ID: {userTableId}), function ID: {this._userFunctionId}"); - this._changeMonitor = new SqlTableChangeMonitor( this._connectionString, userTableId, @@ -156,8 +152,7 @@ public async Task StartAsync(CancellationToken cancellationToken) this._telemetryProps); this._listenerState = ListenerStarted; - this._logger.LogInformation($"Started SQL trigger listener for table: '{this._userTable.FullName}' (object ID: {userTableId}), function ID: {this._userFunctionId}"); - this._logger.LogInformation($"SQL trigger Leases table: {leasesTableName}"); + this._logger.LogDebug($"Started SQL trigger listener for table: '{this._userTable.FullName}' (object ID: {userTableId}), function ID: {this._userFunctionId}, leases table: {leasesTableName}"); var measures = new Dictionary { @@ -197,7 +192,6 @@ public Task StopAsync(CancellationToken cancellationToken) this._changeMonitor.Dispose(); this._listenerState = ListenerStopped; - this._logger.LogInformation($"Stopped SQL trigger listener for table: '{this._userTable.FullName}', function ID: '{this._userFunctionId}'."); } var measures = new Dictionary @@ -225,9 +219,8 @@ FROM sys.columns AS c WHERE c.object_id = {userTableId}; "; - this._logger.LogDebug($"BEGIN GetUserTableColumns Query={getUserTableColumnsQuery}"); using (var getUserTableColumnsCommand = new SqlCommand(getUserTableColumnsQuery, connection)) - using (SqlDataReader reader = await getUserTableColumnsCommand.ExecuteReaderAsync(cancellationToken)) + using (SqlDataReader reader = await getUserTableColumnsCommand.ExecuteReaderAsyncWithLogging(this._logger, cancellationToken)) { var userTableColumns = new List(); var userDefinedTypeColumns = new List<(string name, string type)>(); @@ -261,7 +254,7 @@ FROM sys.columns AS c " Please rename them to be able to use trigger binding."); } - this._logger.LogDebug($"END GetUserTableColumns ColumnNames = {string.Join(", ", userTableColumns.Select(col => $"'{col}'"))}."); + this._logger.LogDebug($"GetUserTableColumns ColumnNames = {string.Join(", ", userTableColumns.Select(col => $"'{col}'"))}."); return userTableColumns; } } @@ -282,14 +275,13 @@ IF SCHEMA_ID(N'{SchemaName}') IS NULL EXEC ('CREATE SCHEMA {SchemaName}'); "; - this._logger.LogDebug($"BEGIN CreateSchema Query={createSchemaQuery}"); using (var createSchemaCommand = new SqlCommand(createSchemaQuery, connection, transaction)) { var stopwatch = Stopwatch.StartNew(); try { - await createSchemaCommand.ExecuteNonQueryAsync(cancellationToken); + await createSchemaCommand.ExecuteNonQueryAsyncWithLogging(this._logger, cancellationToken); } catch (Exception ex) { @@ -308,9 +300,7 @@ IF SCHEMA_ID(N'{SchemaName}') IS NULL } } - long durationMs = stopwatch.ElapsedMilliseconds; - this._logger.LogDebug($"END CreateSchema Duration={durationMs}ms"); - return durationMs; + return stopwatch.ElapsedMilliseconds; } } @@ -335,13 +325,12 @@ PRIMARY KEY (UserFunctionID, UserTableID) ); "; - this._logger.LogDebug($"BEGIN CreateGlobalStateTable Query={createGlobalStateTableQuery}"); using (var createGlobalStateTableCommand = new SqlCommand(createGlobalStateTableQuery, connection, transaction)) { var stopwatch = Stopwatch.StartNew(); try { - await createGlobalStateTableCommand.ExecuteNonQueryAsync(cancellationToken); + await createGlobalStateTableCommand.ExecuteNonQueryAsyncWithLogging(this._logger, cancellationToken); } catch (Exception ex) { @@ -359,9 +348,7 @@ PRIMARY KEY (UserFunctionID, UserTableID) throw; } } - long durationMs = stopwatch.ElapsedMilliseconds; - this._logger.LogDebug($"END CreateGlobalStateTable Duration={durationMs}ms"); - return durationMs; + return stopwatch.ElapsedMilliseconds; } } @@ -379,10 +366,8 @@ private async Task InsertGlobalStateTableRowAsync(SqlConnection connection string getMinValidVersionQuery = $"SELECT CHANGE_TRACKING_MIN_VALID_VERSION({userTableId});"; - this._logger.LogDebug($"BEGIN InsertGlobalStateTableRow"); - this._logger.LogDebug($"BEGIN GetMinValidVersion Query={getMinValidVersionQuery}"); using (var getMinValidVersionCommand = new SqlCommand(getMinValidVersionQuery, connection, transaction)) - using (SqlDataReader reader = await getMinValidVersionCommand.ExecuteReaderAsync(cancellationToken)) + using (SqlDataReader reader = await getMinValidVersionCommand.ExecuteReaderAsyncWithLogging(this._logger, cancellationToken)) { if (!await reader.ReadAsync(cancellationToken)) { @@ -396,7 +381,6 @@ private async Task InsertGlobalStateTableRowAsync(SqlConnection connection throw new InvalidOperationException($"Could not find change tracking enabled for table: '{this._userTable.FullName}'."); } } - this._logger.LogDebug($"END GetMinValidVersion MinValidVersion={minValidVersion}"); string insertRowGlobalStateTableQuery = $@" {AppLockStatements} @@ -409,15 +393,11 @@ INSERT INTO {GlobalStateTableName} VALUES ('{this._userFunctionId}', {userTableId}, {(long)minValidVersion}); "; - this._logger.LogDebug($"BEGIN InsertRowGlobalStateTableQuery Query={insertRowGlobalStateTableQuery}"); using (var insertRowGlobalStateTableCommand = new SqlCommand(insertRowGlobalStateTableQuery, connection, transaction)) { var stopwatch = Stopwatch.StartNew(); - await insertRowGlobalStateTableCommand.ExecuteNonQueryAsync(cancellationToken); - long durationMs = stopwatch.ElapsedMilliseconds; - this._logger.LogDebug($"END InsertRowGlobalStateTableQuery Duration={durationMs}ms"); - this._logger.LogDebug("END InsertGlobalStateTableRow"); - return durationMs; + await insertRowGlobalStateTableCommand.ExecuteNonQueryAsyncWithLogging(this._logger, cancellationToken); + return stopwatch.ElapsedMilliseconds; } } @@ -453,13 +433,12 @@ PRIMARY KEY ({primaryKeys}) ); "; - this._logger.LogDebug($"BEGIN CreateLeasesTable Query={createLeasesTableQuery}"); using (var createLeasesTableCommand = new SqlCommand(createLeasesTableQuery, connection, transaction)) { var stopwatch = Stopwatch.StartNew(); try { - await createLeasesTableCommand.ExecuteNonQueryAsync(cancellationToken); + await createLeasesTableCommand.ExecuteNonQueryAsyncWithLogging(this._logger, cancellationToken); } catch (Exception ex) { @@ -478,7 +457,6 @@ PRIMARY KEY ({primaryKeys}) } } long durationMs = stopwatch.ElapsedMilliseconds; - this._logger.LogDebug($"END CreateLeasesTable Duration={durationMs}ms"); return durationMs; } } diff --git a/src/TriggerBinding/SqlTriggerMetricsProvider.cs b/src/TriggerBinding/SqlTriggerMetricsProvider.cs index 6e7fca3dd..e5c2ea990 100644 --- a/src/TriggerBinding/SqlTriggerMetricsProvider.cs +++ b/src/TriggerBinding/SqlTriggerMetricsProvider.cs @@ -52,9 +52,7 @@ private async Task GetUnprocessedChangeCountAsync() { using (var connection = new SqlConnection(this._connectionString)) { - this._logger.LogDebug("BEGIN OpenGetUnprocessedChangesConnection"); await connection.OpenAsync(); - this._logger.LogDebug("END OpenGetUnprocessedChangesConnection"); int userTableId = await GetUserTableIdAsync(connection, this._userTable.FullName, this._logger, CancellationToken.None); IReadOnlyList<(string name, string type)> primaryKeyColumns = await GetPrimaryKeyColumnsAsync(connection, userTableId, this._logger, this._userTable.FullName, CancellationToken.None); @@ -66,13 +64,11 @@ private async Task GetUnprocessedChangeCountAsync() { using (SqlCommand getUnprocessedChangesCommand = this.BuildGetUnprocessedChangesCommand(connection, transaction, primaryKeyColumns, userTableId)) { - this._logger.LogDebug($"BEGIN GetUnprocessedChangeCount Query={getUnprocessedChangesCommand.CommandText}"); var commandSw = Stopwatch.StartNew(); - unprocessedChangeCount = (long)await getUnprocessedChangesCommand.ExecuteScalarAsync(); + unprocessedChangeCount = (long)await getUnprocessedChangesCommand.ExecuteScalarAsyncWithLogging(this._logger, CancellationToken.None); getUnprocessedChangesDurationMs = commandSw.ElapsedMilliseconds; } - this._logger.LogDebug($"END GetUnprocessedChangeCount Duration={getUnprocessedChangesDurationMs}ms Count={unprocessedChangeCount}"); transaction.Commit(); } catch (Exception) diff --git a/src/TriggerBinding/SqlTriggerScaleMonitor.cs b/src/TriggerBinding/SqlTriggerScaleMonitor.cs index 93d69e990..24f3435a8 100644 --- a/src/TriggerBinding/SqlTriggerScaleMonitor.cs +++ b/src/TriggerBinding/SqlTriggerScaleMonitor.cs @@ -112,7 +112,6 @@ private ScaleStatus GetScaleStatusCore(int workerCount, SqlTriggerMetrics[] metr // Do not make a scale decision unless we have enough samples. if (metrics is null || (metrics.Length < minSamplesForScaling)) { - this._logger.LogInformation($"Requesting no-scaling: Insufficient metrics for making scale decision for table: '{this._userTable.FullName}'."); return status; } @@ -120,14 +119,13 @@ private ScaleStatus GetScaleStatusCore(int workerCount, SqlTriggerMetrics[] metr metrics = metrics.TakeLast(minSamplesForScaling).ToArray(); string counts = string.Join(", ", metrics.Select(metric => metric.UnprocessedChangeCount)); - this._logger.LogDebug($"Unprocessed change counts: [{counts}], worker count: {workerCount}, maximum changes per worker: {this._maxChangesPerWorker}."); // Add worker if the count of unprocessed changes per worker exceeds the maximum limit. long lastUnprocessedChangeCount = metrics.Last().UnprocessedChangeCount; if (lastUnprocessedChangeCount > workerCount * this._maxChangesPerWorker) { status.Vote = ScaleVote.ScaleOut; - this._logger.LogInformation($"Requesting scale-out: Found too many unprocessed changes: {lastUnprocessedChangeCount} for table: '{this._userTable.FullName}' relative to the number of workers."); + this._logger.LogDebug($"Requesting scale-out: Found too many unprocessed changes: {lastUnprocessedChangeCount} for table: '{this._userTable.FullName}' relative to the number of workers."); return status; } @@ -150,15 +148,10 @@ private ScaleStatus GetScaleStatusCore(int workerCount, SqlTriggerMetrics[] metr if (expectedUnprocessedChangeCount > workerCount * this._maxChangesPerWorker) { status.Vote = ScaleVote.ScaleOut; - this._logger.LogInformation($"Requesting scale-out: Found the unprocessed changes for table: '{this._userTable.FullName}' to be continuously increasing" + + this._logger.LogDebug($"Requesting scale-out: Found the unprocessed changes for table: '{this._userTable.FullName}' to be continuously increasing" + " and may exceed the maximum limit set for the workers."); return status; } - else - { - this._logger.LogDebug($"Avoiding scale-out: Found the unprocessed changes: {lastUnprocessedChangeCount} for table: '{this._userTable.FullName}' to be increasing" + - " but they may not exceed the maximum limit set for the workers."); - } } if (isDecreasing) @@ -167,17 +160,10 @@ private ScaleStatus GetScaleStatusCore(int workerCount, SqlTriggerMetrics[] metr if (lastUnprocessedChangeCount <= (workerCount - 1) * this._maxChangesPerWorker) { status.Vote = ScaleVote.ScaleIn; - this._logger.LogInformation($"Requesting scale-in: Found table: '{this._userTable.FullName}' to be either idle or the unprocessed changes to be continuously decreasing."); + this._logger.LogDebug($"Requesting scale-in: Found table: '{this._userTable.FullName}' to be either idle or the unprocessed changes to be continuously decreasing."); return status; } - else - { - this._logger.LogDebug($"Avoiding scale-in: Found the unprocessed changes for table: '{this._userTable.FullName}' to be decreasing" + - " but they are high enough to require all existing workers for processing."); - } } - - this._logger.LogInformation($"Requesting no-scaling: Found the number of unprocessed changes for table: '{this._userTable.FullName}' to not require scaling."); return status; } } diff --git a/src/TriggerBinding/SqlTriggerTargetScaler.cs b/src/TriggerBinding/SqlTriggerTargetScaler.cs index 5b332131c..b56b99625 100644 --- a/src/TriggerBinding/SqlTriggerTargetScaler.cs +++ b/src/TriggerBinding/SqlTriggerTargetScaler.cs @@ -12,17 +12,11 @@ namespace Microsoft.Azure.WebJobs.Extensions.Sql /// internal sealed class SqlTriggerTargetScaler : ITargetScaler { - private readonly string _userFunctionId; - private readonly string _userTableName; - private readonly ILogger _logger; private readonly SqlTriggerMetricsProvider _metricsProvider; private readonly int _maxChangesPerWorker; public SqlTriggerTargetScaler(string userFunctionId, string userTableName, string connectionString, int maxChangesPerWorker, ILogger logger) { - this._userTableName = !string.IsNullOrEmpty(userTableName) ? userTableName : throw new ArgumentNullException(userTableName); - this._userFunctionId = !string.IsNullOrEmpty(userFunctionId) ? userFunctionId : throw new ArgumentNullException(userFunctionId); - this._logger = logger ?? throw new ArgumentNullException(nameof(logger)); this._metricsProvider = new SqlTriggerMetricsProvider(connectionString, logger, new SqlObject(userTableName), userFunctionId); this.TargetScalerDescriptor = new TargetScalerDescriptor(userFunctionId); this._maxChangesPerWorker = maxChangesPerWorker; @@ -37,10 +31,10 @@ public async Task GetScaleResultAsync(TargetScalerContext co // Instance concurrency value is set by the functions host when dynamic concurrency is enabled. See https://learn.microsoft.com/en-us/azure/azure-functions/functions-concurrency for more details. int concurrency = context.InstanceConcurrency ?? this._maxChangesPerWorker; - return this.GetScaleResultInternal(concurrency, metrics.UnprocessedChangeCount); + return GetScaleResultInternal(concurrency, metrics.UnprocessedChangeCount); } - internal TargetScalerResult GetScaleResultInternal(int concurrency, long unprocessedChangeCount) + internal static TargetScalerResult GetScaleResultInternal(int concurrency, long unprocessedChangeCount) { if (concurrency < 1) { @@ -49,8 +43,6 @@ internal TargetScalerResult GetScaleResultInternal(int concurrency, long unproce int targetWorkerCount = (int)Math.Ceiling(unprocessedChangeCount / (decimal)concurrency); - this._logger.LogInformation($"Target worker count for function '{this._userFunctionId}' is '{targetWorkerCount}' TableName ='{this._userTableName}', UnprocessedChangeCount ='{unprocessedChangeCount}', Concurrency='{concurrency}')."); - return new TargetScalerResult { TargetWorkerCount = targetWorkerCount diff --git a/src/TriggerBinding/SqlTriggerUtils.cs b/src/TriggerBinding/SqlTriggerUtils.cs index ac3e38376..89066ca0f 100644 --- a/src/TriggerBinding/SqlTriggerUtils.cs +++ b/src/TriggerBinding/SqlTriggerUtils.cs @@ -41,9 +41,8 @@ FROM sys.indexes AS i INNER JOIN sys.types AS t ON c.user_type_id = t.user_type_id WHERE i.is_primary_key = 1 AND i.object_id = {userTableId}; "; - logger.LogDebug($"BEGIN GetPrimaryKeyColumns Query={getPrimaryKeyColumnsQuery}"); using (var getPrimaryKeyColumnsCommand = new SqlCommand(getPrimaryKeyColumnsQuery, connection)) - using (SqlDataReader reader = await getPrimaryKeyColumnsCommand.ExecuteReaderAsync(cancellationToken)) + using (SqlDataReader reader = await getPrimaryKeyColumnsCommand.ExecuteReaderAsyncWithLogging(logger, cancellationToken)) { string[] variableLengthTypes = new[] { "varchar", "nvarchar", "nchar", "char", "binary", "varbinary" }; string[] variablePrecisionTypes = new[] { "numeric", "decimal" }; @@ -77,7 +76,7 @@ FROM sys.indexes AS i throw new InvalidOperationException($"Could not find primary key created in table: '{userTableName}'."); } - logger.LogDebug($"END GetPrimaryKeyColumns ColumnNames(types) = {string.Join(", ", primaryKeyColumns.Select(col => $"'{col.name}({col.type})'"))}."); + logger.LogDebug($"GetPrimaryKeyColumns ColumnNames(types) = {string.Join(", ", primaryKeyColumns.Select(col => $"'{col.name}({col.type})'"))}."); return primaryKeyColumns; } } @@ -95,9 +94,8 @@ public static async Task GetUserTableIdAsync(SqlConnection connection, stri var userTable = new SqlObject(userTableName); string getObjectIdQuery = $"SELECT OBJECT_ID(N{userTable.QuotedFullName}, 'U');"; - logger.LogDebug($"BEGIN GetUserTableId Query={getObjectIdQuery}"); using (var getObjectIdCommand = new SqlCommand(getObjectIdQuery, connection)) - using (SqlDataReader reader = await getObjectIdCommand.ExecuteReaderAsync(cancellationToken)) + using (SqlDataReader reader = await getObjectIdCommand.ExecuteReaderAsyncWithLogging(logger, cancellationToken)) { if (!await reader.ReadAsync(cancellationToken)) { @@ -110,7 +108,7 @@ public static async Task GetUserTableIdAsync(SqlConnection connection, stri { throw new InvalidOperationException($"Could not find table: '{userTableName}'."); } - logger.LogDebug($"END GetUserTableId TableId={userTableId}"); + logger.LogDebug($"GetUserTableId TableId={userTableId}"); return (int)userTableId; } } diff --git a/test/Unit/TriggerBinding/SqlTriggerScaleMonitorTests.cs b/test/Unit/TriggerBinding/SqlTriggerScaleMonitorTests.cs index 828f2df84..a2f37abb6 100644 --- a/test/Unit/TriggerBinding/SqlTriggerScaleMonitorTests.cs +++ b/test/Unit/TriggerBinding/SqlTriggerScaleMonitorTests.cs @@ -38,13 +38,12 @@ public void ScaleMonitorDescriptor_ReturnsExpectedValue(string tableName, string [InlineData(new int[] { 1000, 1000, 1000, 1000 })] // metrics.Length == 4. public void ScaleMonitorGetScaleStatus_InsufficentMetrics_ReturnsNone(int[] unprocessedChangeCounts) { - (IScaleMonitor monitor, List logMessages) = GetScaleMonitor(); + (IScaleMonitor monitor, List _) = GetScaleMonitor(); ScaleStatusContext context = GetScaleStatusContext(unprocessedChangeCounts, 0); ScaleStatus scaleStatus = monitor.GetScaleStatus(context); Assert.Equal(ScaleVote.None, scaleStatus.Vote); - Assert.Contains("Requesting no-scaling: Insufficient metrics for making scale decision for table: 'testTableName'.", logMessages); } /// @@ -94,13 +93,12 @@ public void ScaleMonitorGetScaleStatus_LastCountAboveLimit_ReturnsScaleOut(int[] [InlineData(new int[] { 0, 0, 0, 0, 10000 }, 10)] public void ScaleMonitorGetScaleStatus_LastCountBelowLimit_ReturnsNone(int[] unprocessedChangeCounts, int workerCount) { - (IScaleMonitor monitor, List logMessages) = GetScaleMonitor(); + (IScaleMonitor monitor, List _) = GetScaleMonitor(); ScaleStatusContext context = GetScaleStatusContext(unprocessedChangeCounts, workerCount); ScaleStatus scaleStatus = monitor.GetScaleStatus(context); Assert.Equal(ScaleVote.None, scaleStatus.Vote); - Assert.Contains("Requesting no-scaling: Found the number of unprocessed changes for table: 'testTableName' to not require scaling.", logMessages); } /// @@ -132,13 +130,12 @@ public void ScaleMonitorGetScaleStatus_CountIncreasingAboveLimit_ReturnsScaleOut [InlineData(new int[] { 0, 1, 5000, 5001, 7500 }, 10)] public void ScaleMonitorGetScaleStatus_CountIncreasingBelowLimit_ReturnsNone(int[] unprocessedChangeCounts, int workerCount) { - (IScaleMonitor monitor, List logMessages) = GetScaleMonitor(); + (IScaleMonitor monitor, List _) = GetScaleMonitor(); ScaleStatusContext context = GetScaleStatusContext(unprocessedChangeCounts, workerCount); ScaleStatus scaleStatus = monitor.GetScaleStatus(context); Assert.Equal(ScaleVote.None, scaleStatus.Vote); - Assert.Contains($"Avoiding scale-out: Found the unprocessed changes: {unprocessedChangeCounts.Last()} for table: 'testTableName' to be increasing but they may not exceed the maximum limit set for the workers.", string.Join(" ", logMessages)); } /// @@ -170,13 +167,12 @@ public void ScaleMonitorGetScaleStatus_CountDecreasingBelowLimit_ReturnsScaleIn( [InlineData(new int[] { 9005, 9004, 9003, 9002, 9001 }, 10)] public void ScaleMonitorGetScaleStatus_CountDecreasingAboveLimit_ReturnsNone(int[] unprocessedChangeCounts, int workerCount) { - (IScaleMonitor monitor, List logMessages) = GetScaleMonitor(); + (IScaleMonitor monitor, List _) = GetScaleMonitor(); ScaleStatusContext context = GetScaleStatusContext(unprocessedChangeCounts, workerCount); ScaleStatus scaleStatus = monitor.GetScaleStatus(context); Assert.Equal(ScaleVote.None, scaleStatus.Vote); - Assert.Contains("Avoiding scale-in: Found the unprocessed changes for table: 'testTableName' to be decreasing but they are high enough to require all existing workers for processing.", string.Join(" ", logMessages)); } /// @@ -188,17 +184,12 @@ public void ScaleMonitorGetScaleStatus_CountDecreasingAboveLimit_ReturnsNone(int [InlineData(new int[] { 1, 1, 0, 0, 0 }, 10)] public void ScaleMonitorGetScaleStatus_CountNotIncreasingOrDecreasing_ReturnsNone(int[] unprocessedChangeCounts, int workerCount) { - (IScaleMonitor monitor, List logMessages) = GetScaleMonitor(); + (IScaleMonitor monitor, List _) = GetScaleMonitor(); ScaleStatusContext context = GetScaleStatusContext(unprocessedChangeCounts, workerCount); ScaleStatus scaleStatus = monitor.GetScaleStatus(context); Assert.Equal(ScaleVote.None, scaleStatus.Vote); - - // Ensure that no-scaling was not requested because of other conditions. - Assert.DoesNotContain("Avoiding scale-out: Found the unprocessed changes for table: 'testTableName' to be increasing but they may not exceed the maximum limit set for the workers.", logMessages); - Assert.DoesNotContain("Avoiding scale-in: Found the unprocessed changes for table: 'testTableName' to be decreasing but they are high enough to require all existing workers for processing.", logMessages); - Assert.Contains("Requesting no-scaling: Found the number of unprocessed changes for table: 'testTableName' to not require scaling.", logMessages); } [Theory] diff --git a/test/Unit/TriggerBinding/SqlTriggerTargetScalerTests.cs b/test/Unit/TriggerBinding/SqlTriggerTargetScalerTests.cs index edef5f6c2..7787d4c65 100644 --- a/test/Unit/TriggerBinding/SqlTriggerTargetScalerTests.cs +++ b/test/Unit/TriggerBinding/SqlTriggerTargetScalerTests.cs @@ -2,8 +2,6 @@ // Licensed under the MIT License. See License.txt in the project root for license information. using Microsoft.Azure.WebJobs.Host.Scale; -using Microsoft.Extensions.Logging; -using Moq; using Xunit; namespace Microsoft.Azure.WebJobs.Extensions.Sql.Tests.Unit @@ -20,15 +18,7 @@ public class SqlTriggerTargetScalerTests [InlineData(100, null, 1)] public void SqlTriggerTargetScaler_Returns_Expected(int unprocessedChangeCount, int? concurrency, int expected) { - var targetScaler = new SqlTriggerTargetScaler( - "testUserFunctionId", - "testUserTableName", - "testConnectionString", - SqlTriggerListener.DefaultMaxChangesPerWorker, - Mock.Of() - ); - - TargetScalerResult result = targetScaler.GetScaleResultInternal(concurrency ?? SqlTriggerListener.DefaultMaxChangesPerWorker, unprocessedChangeCount); + TargetScalerResult result = SqlTriggerTargetScaler.GetScaleResultInternal(concurrency ?? SqlTriggerListener.DefaultMaxChangesPerWorker, unprocessedChangeCount); Assert.Equal(result.TargetWorkerCount, expected); }