Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion samples/samples-csharp/host.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
}
},
"logLevel": {
"default": "Information"
"default": "Debug"

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that we use debug logging for more stuff I had to change this for tests. We originally had it as Information so customers didn't copy the debug settings, but in this case I think it's fine to have that vs. adding some functionality to switch the log level at runtime (which I had looked into earlier and didn't see an obvious way to do)

}
}
}
2 changes: 1 addition & 1 deletion src/SqlBindingConfigProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ private static void LogDependentAssemblyVersions(ILogger logger)
{
try
{
logger.LogInformation($"Using {assembly.GetName().Name} {FileVersionInfo.GetVersionInfo(assembly.Location).ProductVersion}");
logger.LogDebug($"Using {assembly.GetName().Name} {FileVersionInfo.GetVersionInfo(assembly.Location).ProductVersion}");
}
catch (Exception ex)
{
Expand Down
20 changes: 20 additions & 0 deletions src/SqlBindingUtilities.cs
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,26 @@ public static async Task<ServerProperties> GetServerTelemetryProperties(SqlConne
return null;
}

/// <summary>
/// Calls ExecuteScalarAsync and logs an error if it fails before rethrowing.
/// </summary>
/// <param name="cmd">The SqlCommand being executed</param>
/// <param name="logger">The logger</param>
/// <param name="cancellationToken">The cancellation token to pass to the call</param>
/// <returns>The result of the call</returns>
public static async Task<object> ExecuteScalarAsyncWithLogging(this SqlCommand cmd, ILogger logger, CancellationToken cancellationToken)
{
try
{
return await cmd.ExecuteScalarAsync(cancellationToken);
}
catch (Exception e)
{
logger.LogError($"Exception executing query. Message={e.Message}\nQuery={cmd.CommandText}");
throw;
}
}

/// <summary>
/// Calls ExecuteNonQueryAsync and logs an error if it fails before rethrowing.
/// </summary>
Expand Down
1 change: 0 additions & 1 deletion src/SqlConverters.cs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,6 @@ public virtual async Task<string> BuildItemFromAttributeAsync(SqlAttribute attri
TelemetryInstance.TrackConvert(type, props);
var dataTable = new DataTable();
adapter.Fill(dataTable);
this._logger.LogInformation($"{dataTable.Rows.Count} row(s) queried from database: {connection.Database} using Command: {command.CommandText}");
// Serialize any DateTime objects in UTC format
var jsonSerializerSettings = new JsonSerializerSettings()
{
Expand Down
1 change: 0 additions & 1 deletion src/Telemetry/Telemetry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ public void Initialize(IConfiguration config, ILogger logger)
this.Enabled = !(Utils.GetEnvironmentVariableAsBool(TelemetryOptoutEnvVar) || Utils.GetConfigSettingAsBool(TelemetryOptoutSetting, config));
if (!this.Enabled)
{
this._logger.LogInformation("Telemetry disabled");
return;
}
this._logger.LogInformation(WelcomeMessage);
Expand Down
52 changes: 8 additions & 44 deletions src/TriggerBinding/SqlTableChangeMonitor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -189,17 +189,15 @@ public void Dispose()
/// </summary>
private async Task RunChangeConsumptionLoopAsync()
{
this._logger.LogInformation($"Starting change consumption loop. MaxBatchSize: {this._maxBatchSize} PollingIntervalMs: {this._pollingIntervalInMs}");
this._logger.LogDebug($"Starting change consumption loop. MaxBatchSize: {this._maxBatchSize} PollingIntervalMs: {this._pollingIntervalInMs}");

try
{
CancellationToken token = this._cancellationTokenSourceCheckForChanges.Token;

using (var connection = new SqlConnection(this._connectionString))
{
this._logger.LogDebug("BEGIN OpenChangeConsumptionConnection");
await connection.OpenAsync(token);
this._logger.LogDebug("END OpenChangeConsumptionConnection");

bool forceReconnect = false;
// Check for cancellation request only after a cycle of checking and processing of changes completes.
Expand All @@ -216,7 +214,6 @@ private async Task RunChangeConsumptionLoopAsync()
{
forceReconnect = false;
}
this._logger.LogDebug($"BEGIN ProcessingChanges State={this._state}");

try
{
Expand All @@ -243,8 +240,6 @@ private async Task RunChangeConsumptionLoopAsync()
this._logger.LogError($"Fatal SQL Client exception processing changes. Will attempt to reestablish connection in {this._pollingIntervalInMs}ms. Exception = {e.Message}");
forceReconnect = true;
}
this._logger.LogDebug("END ProcessingChanges");
this._logger.LogDebug($"Delaying for {this._pollingIntervalInMs}ms");
await Task.Delay(TimeSpan.FromMilliseconds(this._pollingIntervalInMs), token);
}
}
Expand Down Expand Up @@ -276,7 +271,6 @@ private async Task RunChangeConsumptionLoopAsync()
/// </summary>
private async Task GetTableChangesAsync(SqlConnection connection, CancellationToken token)
{
this._logger.LogDebug("BEGIN GetTableChanges");
try
{
var transactionSw = Stopwatch.StartNew();
Expand All @@ -289,19 +283,16 @@ private async Task GetTableChangesAsync(SqlConnection connection, CancellationTo
// Update the version number stored in the global state table if necessary before using it.
using (SqlCommand updateTablesPreInvocationCommand = this.BuildUpdateTablesPreInvocation(connection, transaction))
{
this._logger.LogDebug($"BEGIN UpdateTablesPreInvocation Query={updateTablesPreInvocationCommand.CommandText}");
var commandSw = Stopwatch.StartNew();
await updateTablesPreInvocationCommand.ExecuteNonQueryAsync(token);
await updateTablesPreInvocationCommand.ExecuteNonQueryAsyncWithLogging(this._logger, token);
setLastSyncVersionDurationMs = commandSw.ElapsedMilliseconds;
}
this._logger.LogDebug($"END UpdateTablesPreInvocation Duration={setLastSyncVersionDurationMs}ms");

var rows = new List<IReadOnlyDictionary<string, object>>();

// Use the version number to query for new changes.
using (SqlCommand getChangesCommand = this.BuildGetChangesCommand(connection, transaction))
{
this._logger.LogDebug($"BEGIN GetChanges Query={getChangesCommand.CommandText}");
var commandSw = Stopwatch.StartNew();

using (SqlDataReader reader = await getChangesCommand.ExecuteReaderAsync(token))
Expand All @@ -314,19 +305,16 @@ private async Task GetTableChangesAsync(SqlConnection connection, CancellationTo

getChangesDurationMs = commandSw.ElapsedMilliseconds;
}
this._logger.LogDebug($"END GetChanges Duration={getChangesDurationMs}ms ChangedRows={rows.Count}");

// If changes were found, acquire leases on them.
if (rows.Count > 0)
{
using (SqlCommand acquireLeasesCommand = this.BuildAcquireLeasesCommand(connection, transaction, rows))
{
this._logger.LogDebug($"BEGIN AcquireLeases Query={acquireLeasesCommand.CommandText}");
var commandSw = Stopwatch.StartNew();
await acquireLeasesCommand.ExecuteNonQueryAsync(token);
await acquireLeasesCommand.ExecuteNonQueryAsyncWithLogging(this._logger, token);
acquireLeasesDurationMs = commandSw.ElapsedMilliseconds;
}
this._logger.LogDebug($"END AcquireLeases Duration={acquireLeasesDurationMs}ms");

// Only send event if we got changes to reduce the overall number of events sent since we generally
// only care about the times that we had to actually retrieve and process rows
Expand Down Expand Up @@ -375,12 +363,10 @@ private async Task GetTableChangesAsync(SqlConnection connection, CancellationTo
throw;
}
}
this._logger.LogDebug("END GetTableChanges");
}

private async Task ProcessTableChangesAsync()
{
this._logger.LogDebug("BEGIN ProcessTableChanges");
if (this._rowsToProcess.Count > 0)
{
IReadOnlyList<SqlChange<T>> changes = null;
Expand All @@ -407,7 +393,6 @@ private async Task ProcessTableChangesAsync()
{
var input = new TriggeredFunctionData() { TriggerValue = changes };

this._logger.LogDebug("Executing triggered function");
var stopwatch = Stopwatch.StartNew();

FunctionResult result = await this._executor.TryExecuteAsync(input, this._cancellationTokenSourceExecutor.Token);
Expand All @@ -419,7 +404,6 @@ private async Task ProcessTableChangesAsync()
};
if (result.Succeeded)
{
this._logger.LogDebug($"Successfully triggered function. Duration={durationMs}ms");
TelemetryInstance.TrackEvent(TelemetryEventName.TriggerFunction, this._telemetryProps, measures);
// We've successfully fully processed these so set them to be released in the cleanup phase
this._rowsToRelease = this._rowsToProcess;
Expand All @@ -429,7 +413,6 @@ private async Task ProcessTableChangesAsync()
{
// In the future might make sense to retry executing the function, but for now we just let
// another worker try.
this._logger.LogError($"Failed to trigger user function for table: '{this._userTable.FullName} due to exception: {result.Exception.GetType()}. Exception message: {result.Exception.Message}");
TelemetryInstance.TrackException(TelemetryErrorName.TriggerFunction, result.Exception, this._telemetryProps, measures);
}
this._state = State.Cleanup;
Expand All @@ -441,7 +424,6 @@ private async Task ProcessTableChangesAsync()
// any we still ensure everything is reset to a clean state
await this.ClearRowsAsync();
}
this._logger.LogDebug("END ProcessTableChanges");
}

/// <summary>
Expand All @@ -450,17 +432,15 @@ private async Task ProcessTableChangesAsync()
/// </summary>
private async void RunLeaseRenewalLoopAsync()
{
this._logger.LogInformation("Starting lease renewal loop.");
this._logger.LogDebug("Starting lease renewal loop.");

try
{
CancellationToken token = this._cancellationTokenSourceRenewLeases.Token;

using (var connection = new SqlConnection(this._connectionString))
{
this._logger.LogDebug("BEGIN OpenLeaseRenewalLoopConnection");
await connection.OpenAsync(token);
this._logger.LogDebug("END OpenLeaseRenewalLoopConnection");

bool forceReconnect = false;
while (!token.IsCancellationRequested)
Expand Down Expand Up @@ -509,9 +489,7 @@ private async void RunLeaseRenewalLoopAsync()

private async Task RenewLeasesAsync(SqlConnection connection, CancellationToken token)
{
this._logger.LogDebug("BEGIN WaitRowsLock - RenewLeases");
await this._rowsLock.WaitAsync(token);
this._logger.LogDebug("END WaitRowsLock - RenewLeases");

if (this._state == State.ProcessingChanges && this._rowsToProcess.Count > 0)
{
Expand All @@ -522,13 +500,11 @@ private async Task RenewLeasesAsync(SqlConnection connection, CancellationToken
{
using (SqlCommand renewLeasesCommand = this.BuildRenewLeasesCommand(connection, transaction))
{
this._logger.LogDebug($"BEGIN RenewLeases Query={renewLeasesCommand.CommandText}");
var stopwatch = Stopwatch.StartNew();

int rowsAffected = await renewLeasesCommand.ExecuteNonQueryAsync(token);
int rowsAffected = await renewLeasesCommand.ExecuteNonQueryAsyncWithLogging(this._logger, token);

long durationMs = stopwatch.ElapsedMilliseconds;
this._logger.LogDebug($"END RenewLeases Duration={durationMs}ms RowsAffected={rowsAffected}");

if (rowsAffected > 0)
{
Expand Down Expand Up @@ -588,7 +564,6 @@ private async Task RenewLeasesAsync(SqlConnection connection, CancellationToken
}

// Want to always release the lock at the end, even if renewing the leases failed.
this._logger.LogDebug("ReleaseRowsLock - RenewLeases");
this._rowsLock.Release();
}

Expand All @@ -597,15 +572,12 @@ private async Task RenewLeasesAsync(SqlConnection connection, CancellationToken
/// </summary>
private async Task ClearRowsAsync()
{
this._logger.LogDebug("BEGIN WaitRowsLock - ClearRows");
await this._rowsLock.WaitAsync();
this._logger.LogDebug("END WaitRowsLock - ClearRows");

this._leaseRenewalCount = 0;
this._state = State.CheckingForChanges;
this._rowsToProcess = new List<IReadOnlyDictionary<string, object>>();

this._logger.LogDebug("ReleaseRowsLock - ClearRows");
this._rowsLock.Release();
}

Expand All @@ -632,22 +604,18 @@ private async Task ReleaseLeasesAsync(SqlConnection connection, CancellationToke
// Release the leases held on "_rowsToRelease".
using (SqlCommand releaseLeasesCommand = this.BuildReleaseLeasesCommand(connection, transaction))
{
this._logger.LogDebug($"BEGIN ReleaseLeases Query={releaseLeasesCommand.CommandText}");
var commandSw = Stopwatch.StartNew();
int rowsUpdated = await releaseLeasesCommand.ExecuteNonQueryAsync(token);
int rowsUpdated = await releaseLeasesCommand.ExecuteNonQueryAsyncWithLogging(this._logger, token);
releaseLeasesDurationMs = commandSw.ElapsedMilliseconds;
this._logger.LogDebug($"END ReleaseLeases Duration={releaseLeasesDurationMs}ms RowsUpdated={rowsUpdated}");
}

// Update the global state table if we have processed all changes with ChangeVersion <= newLastSyncVersion,
// and clean up the leases table to remove all rows with ChangeVersion <= newLastSyncVersion.
using (SqlCommand updateTablesPostInvocationCommand = this.BuildUpdateTablesPostInvocation(connection, transaction, newLastSyncVersion))
{
this._logger.LogDebug($"BEGIN UpdateTablesPostInvocation Query={updateTablesPostInvocationCommand.CommandText}");
var commandSw = Stopwatch.StartNew();
await updateTablesPostInvocationCommand.ExecuteNonQueryAsync(token);
await updateTablesPostInvocationCommand.ExecuteNonQueryAsyncWithLogging(this._logger, token);
updateLastSyncVersionDurationMs = commandSw.ElapsedMilliseconds;
this._logger.LogDebug($"END UpdateTablesPostInvocation Duration={updateLastSyncVersionDurationMs}ms");
}
transaction.Commit();

Expand Down Expand Up @@ -717,9 +685,7 @@ private long RecomputeLastSyncVersion()
// the only version number in the set.
// Also this LastSyncVersion is actually updated in the GlobalState table only after verifying that the changes with

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove / Change comment since you removed the lastSyncVersion var.

// changeVersion <= newLastSyncVersion have been processed in BuildUpdateTablesPostInvocation query.
long lastSyncVersion = changeVersionSet.ElementAt(changeVersionSet.Count > 1 ? changeVersionSet.Count - 2 : 0);
this._logger.LogDebug($"RecomputeLastSyncVersion. LastSyncVersion={lastSyncVersion} ChangeVersionSet={string.Join(",", changeVersionSet)}");
return lastSyncVersion;
return changeVersionSet.ElementAt(changeVersionSet.Count > 1 ? changeVersionSet.Count - 2 : 0);
}

/// <summary>
Expand All @@ -730,7 +696,6 @@ private long RecomputeLastSyncVersion()
/// <returns>The list of changes</returns>
private IReadOnlyList<SqlChange<T>> ProcessChanges()
{
this._logger.LogDebug("BEGIN ProcessChanges");
var changes = new List<SqlChange<T>>();
foreach (IReadOnlyDictionary<string, object> row in this._rowsToProcess)
{
Expand All @@ -744,7 +709,6 @@ private IReadOnlyList<SqlChange<T>> ProcessChanges()

changes.Add(new SqlChange<T>(operation, Utils.JsonDeserializeObject<T>(Utils.JsonSerializeObject(item))));
}
this._logger.LogDebug("END ProcessChanges");
return changes;
}

Expand Down
Loading