Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 5 additions & 19 deletions src/SqlAsyncCollector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,7 @@ public SqlAsyncCollector(IConfiguration configuration, SqlAttribute attribute, I
TelemetryInstance.TrackCreate(CreateType.SqlAsyncCollector);
using (SqlConnection connection = BuildConnection(attribute.ConnectionStringSetting, configuration))
{
this._logger.LogDebugWithThreadId("BEGIN OpenSqlAsyncCollectorVerifyDatabaseSupportedConnection");
connection.OpenAsyncWithSqlErrorHandling(CancellationToken.None).Wait();
this._logger.LogDebugWithThreadId("END OpenSqlAsyncCollectorVerifyDatabaseSupportedConnection");
VerifyDatabaseSupported(connection, logger, CancellationToken.None).Wait();
}
}
Expand Down Expand Up @@ -172,13 +170,10 @@ public async Task FlushAsync(CancellationToken cancellationToken = default)
/// <param name="configuration"> Used to build up the connection </param>
private async Task UpsertRowsAsync(IList<T> rows, SqlAttribute attribute, IConfiguration configuration)
{
this._logger.LogDebugWithThreadId("BEGIN UpsertRowsAsync");
var upsertRowsAsyncSw = Stopwatch.StartNew();
using (SqlConnection connection = BuildConnection(attribute.ConnectionStringSetting, configuration))
{
this._logger.LogDebugWithThreadId("BEGIN OpenUpsertRowsAsyncConnection");
await connection.OpenAsync();
this._logger.LogDebugWithThreadId("END OpenUpsertRowsAsyncConnection");
this._serverProperties = await GetServerTelemetryProperties(connection, this._logger, CancellationToken.None);
Dictionary<TelemetryPropertyName, string> props = connection.AsConnectionProps(this._serverProperties);

Expand All @@ -196,7 +191,7 @@ private async Task UpsertRowsAsync(IList<T> rows, SqlAttribute attribute, IConfi
{
if (int.TryParse(timeoutEnvVar, NumberStyles.Integer, CultureInfo.InvariantCulture, out timeout))
{
this._logger.LogDebugWithThreadId($"Overriding default table info cache timeout with new value {timeout}");
this._logger.LogDebug($"Overriding default table info cache timeout with new value {timeout}");
}
else
{
Expand Down Expand Up @@ -253,7 +248,6 @@ private async Task UpsertRowsAsync(IList<T> rows, SqlAttribute attribute, IConfi
string mergeOrInsertQuery = tableInfo.QueryType == QueryType.Insert ? TableInformation.GetInsertQuery(table, bracketedColumnNamesFromItem) :
TableInformation.GetMergeQuery(tableInfo.PrimaryKeys, table, bracketedColumnNamesFromItem);

this._logger.LogDebugWithThreadId("BEGIN UpsertRowsTransaction");
var transactionSw = Stopwatch.StartNew();
int batchSize = 1000;
SqlTransaction transaction = connection.BeginTransaction();
Expand All @@ -270,9 +264,8 @@ private async Task UpsertRowsAsync(IList<T> rows, SqlAttribute attribute, IConfi
batchCount++;
GenerateDataQueryForMerge(tableInfo, batch, out string newDataQuery, out string rowData);
command.CommandText = $"{newDataQuery} {mergeOrInsertQuery};";
this._logger.LogDebugWithThreadId($"UpsertRowsTransactionBatch - Query={command.CommandText}");
par.Value = rowData;
await command.ExecuteNonQueryAsync();
await command.ExecuteNonQueryAsyncWithLogging(this._logger, CancellationToken.None);
}
transaction.Commit();
transactionSw.Stop();
Expand All @@ -286,8 +279,6 @@ private async Task UpsertRowsAsync(IList<T> rows, SqlAttribute attribute, IConfi
{ TelemetryMeasureName.NumRows, rows.Count }
};
TelemetryInstance.TrackEvent(TelemetryEventName.Upsert, props, measures);
this._logger.LogDebugWithThreadId($"END UpsertRowsTransaction Duration={transactionSw.ElapsedMilliseconds}ms Upserted {rows.Count} row(s) into database: {connection.Database} and table: {fullTableName}.");
this._logger.LogDebugWithThreadId($"END UpsertRowsAsync Duration={upsertRowsAsyncSw.ElapsedMilliseconds}ms");
}
catch (Exception ex)
{
Expand Down Expand Up @@ -562,7 +553,6 @@ WHEN NOT MATCHED THEN
public static async Task<TableInformation> RetrieveTableInformationAsync(SqlConnection sqlConnection, string fullName, ILogger logger, IEnumerable<string> objectColumnNames, ServerProperties serverProperties)
{
Dictionary<TelemetryPropertyName, string> sqlConnProps = sqlConnection.AsConnectionProps(serverProperties);
logger.LogDebugWithThreadId("BEGIN RetrieveTableInformationAsync");
var table = new SqlObject(fullName);

var tableInfoSw = Stopwatch.StartNew();
Expand All @@ -573,9 +563,8 @@ public static async Task<TableInformation> RetrieveTableInformationAsync(SqlConn
try
{
string getColumnDefinitionsQuery = GetColumnDefinitionsQuery(table);
logger.LogDebugWithThreadId($"BEGIN GetColumnDefinitions Query=\"{getColumnDefinitionsQuery}\"");
var cmdColDef = new SqlCommand(getColumnDefinitionsQuery, sqlConnection);
using (SqlDataReader rdr = await cmdColDef.ExecuteReaderAsync())
using (SqlDataReader rdr = await cmdColDef.ExecuteReaderAsyncWithLogging(logger, CancellationToken.None))
{
while (await rdr.ReadAsync())
{
Expand All @@ -584,7 +573,6 @@ public static async Task<TableInformation> RetrieveTableInformationAsync(SqlConn
}
columnDefinitionsSw.Stop();
TelemetryInstance.TrackDuration(TelemetryEventName.GetColumnDefinitions, columnDefinitionsSw.ElapsedMilliseconds, sqlConnProps);
logger.LogDebugWithThreadId($"END GetColumnDefinitions Duration={columnDefinitionsSw.ElapsedMilliseconds}ms");
}

}
Expand All @@ -610,9 +598,8 @@ public static async Task<TableInformation> RetrieveTableInformationAsync(SqlConn
try
{
string getPrimaryKeysQuery = GetPrimaryKeysQuery(table);
logger.LogDebugWithThreadId($"BEGIN GetPrimaryKeys Query=\"{getPrimaryKeysQuery}\"");
var cmd = new SqlCommand(getPrimaryKeysQuery, sqlConnection);
using (SqlDataReader rdr = await cmd.ExecuteReaderAsync())
using (SqlDataReader rdr = await cmd.ExecuteReaderAsyncWithLogging(logger, CancellationToken.None))
{
while (await rdr.ReadAsync())
{
Expand All @@ -621,7 +608,6 @@ public static async Task<TableInformation> RetrieveTableInformationAsync(SqlConn
}
primaryKeysSw.Stop();
TelemetryInstance.TrackDuration(TelemetryEventName.GetPrimaryKeys, primaryKeysSw.ElapsedMilliseconds, sqlConnProps);
logger.LogDebugWithThreadId($"END GetPrimaryKeys Duration={primaryKeysSw.ElapsedMilliseconds}ms");
}
}
catch (Exception ex)
Expand Down Expand Up @@ -670,7 +656,7 @@ public static async Task<TableInformation> RetrieveTableInformationAsync(SqlConn
sqlConnProps.Add(TelemetryPropertyName.QueryType, queryType.ToString());
sqlConnProps.Add(TelemetryPropertyName.HasIdentityColumn, hasIdentityColumnPrimaryKeys.ToString());
TelemetryInstance.TrackDuration(TelemetryEventName.GetTableInfo, tableInfoSw.ElapsedMilliseconds, sqlConnProps, durations);
logger.LogDebugWithThreadId($"END RetrieveTableInformationAsync Duration={tableInfoSw.ElapsedMilliseconds}ms DB and Table: {sqlConnection.Database}.{fullName}. Primary keys: [{string.Join(",", primaryKeys.Select(pk => pk.Name))}]. SQL Column and Definitions: [{string.Join(",", columnDefinitionsFromSQL)}] Object columns: [{string.Join(",", objectColumnNames)}]");
logger.LogDebug($"RetrieveTableInformationAsync DB and Table: {sqlConnection.Database}.{fullName}. Primary keys: [{string.Join(",", primaryKeys.Select(pk => pk.Name))}].\nSQL Column and Definitions: [{string.Join(",", columnDefinitionsFromSQL)}]\nObject columns: [{string.Join(",", objectColumnNames)}]");
return new TableInformation(primaryKeys, primaryKeyProperties, columnDefinitionsFromSQL, queryType, hasIdentityColumnPrimaryKeys);
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/SqlBindingConfigProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public void Initialize(ExtensionConfigContext context)
LogDependentAssemblyVersions(logger);
#pragma warning disable CS0618 // Fine to use this for our stuff
FluentBindingRule<SqlAttribute> inputOutputRule = context.AddBindingRule<SqlAttribute>();
var converter = new SqlConverter(this._configuration, logger);
var converter = new SqlConverter(this._configuration);
inputOutputRule.BindToInput(converter);
inputOutputRule.BindToInput<string>(typeof(SqlGenericsConverter<string>), this._configuration, logger);
inputOutputRule.BindToCollector<SQLObjectOpenType>(typeof(SqlAsyncCollectorBuilder<>), this._configuration, logger);
Expand Down
49 changes: 42 additions & 7 deletions src/SqlBindingUtilities.cs
Original file line number Diff line number Diff line change
Expand Up @@ -193,9 +193,8 @@ public static async Task VerifyDatabaseSupported(SqlConnection connection, ILogg

string verifyDatabaseSupportedQuery = $"SELECT compatibility_level FROM sys.databases WHERE Name = DB_NAME()";

logger.LogDebugWithThreadId($"BEGIN VerifyDatabaseSupported Query={verifyDatabaseSupportedQuery}");
using (var verifyDatabaseSupportedCommand = new SqlCommand(verifyDatabaseSupportedQuery, connection))
using (SqlDataReader reader = await verifyDatabaseSupportedCommand.ExecuteReaderAsync(cancellationToken))
using (SqlDataReader reader = await verifyDatabaseSupportedCommand.ExecuteReaderAsyncWithLogging(logger, cancellationToken))
{
if (!await reader.ReadAsync(cancellationToken))
{
Expand All @@ -204,7 +203,6 @@ public static async Task VerifyDatabaseSupported(SqlConnection connection, ILogg

int compatLevel = reader.GetByte(0);

logger.LogDebugWithThreadId($"END GetUserTableId CompatLevel={compatLevel}");
if (compatLevel < MIN_SUPPORTED_COMPAT_LEVEL)
{
throw new InvalidOperationException($"SQL bindings require a database compatibility level of 130 or higher to function. Current compatibility level = {compatLevel}");
Expand Down Expand Up @@ -275,7 +273,6 @@ internal static async Task<bool> TryEnsureConnected(this SqlConnection conn,
if (forceReconnect || conn.State.HasFlag(ConnectionState.Broken | ConnectionState.Closed))
{
logger.LogWarning($"{connectionName} is broken, attempting to reconnect...");
logger.LogDebugWithThreadId($"BEGIN RetryOpen{connectionName}");
try
{
// Sometimes the connection state is listed as open even if a fatal exception occurred, see
Expand All @@ -287,7 +284,6 @@ internal static async Task<bool> TryEnsureConnected(this SqlConnection conn,
}
await conn.OpenAsync(token);
logger.LogInformation($"Successfully re-established {connectionName}!");
logger.LogDebugWithThreadId($"END RetryOpen{connectionName}");
return true;
}
catch (Exception e)
Expand All @@ -311,9 +307,8 @@ public static async Task<ServerProperties> GetServerTelemetryProperties(SqlConne
{
string serverPropertiesQuery = $"SELECT SERVERPROPERTY('EngineEdition'), SERVERPROPERTY('Edition')";

logger.LogDebugWithThreadId($"BEGIN GetServerTelemetryProperties Query={serverPropertiesQuery}");
using (var selectServerEditionCommand = new SqlCommand(serverPropertiesQuery, connection))
using (SqlDataReader reader = await selectServerEditionCommand.ExecuteReaderAsync(cancellationToken))
using (SqlDataReader reader = await selectServerEditionCommand.ExecuteReaderAsyncWithLogging(logger, cancellationToken))
{
if (await reader.ReadAsync(cancellationToken))
{
Expand Down Expand Up @@ -366,5 +361,45 @@ public static async Task<ServerProperties> GetServerTelemetryProperties(SqlConne
}
return null;
}

/// <summary>
/// Calls ExecuteNonQueryAsync and logs an error if it fails before rethrowing.
/// </summary>
/// <param name="cmd">The SqlCommand being executed</param>
/// <param name="logger">The logger</param>
/// <param name="cancellationToken">The cancellation token to pass to the call</param>
/// <returns>The result of the call</returns>
public static async Task<int> ExecuteNonQueryAsyncWithLogging(this SqlCommand cmd, ILogger logger, CancellationToken cancellationToken)
{
try
{
return await cmd.ExecuteNonQueryAsync(cancellationToken);
}
catch (Exception e)
{
logger.LogError($"Exception executing query. Message={e.Message}\nQuery={cmd.CommandText}");
throw;
}
}

/// <summary>
/// Calls ExecuteReaderAsync and logs an error if it fails before rethrowing.
/// </summary>
/// <param name="cmd">The SqlCommand being executed</param>
/// <param name="logger">The logger</param>
/// <param name="cancellationToken">The cancellation token to pass to the call</param>
/// <returns>The result of the call</returns>
public static async Task<SqlDataReader> ExecuteReaderAsyncWithLogging(this SqlCommand cmd, ILogger logger, CancellationToken cancellationToken)
{
try
{
return await cmd.ExecuteReaderAsync(cancellationToken);
}
catch (Exception e)
{
logger.LogError($"Exception executing query. Message={e.Message}\nQuery={cmd.CommandText}");
throw;
}
}
}
}
26 changes: 4 additions & 22 deletions src/SqlConverters.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
using System;
using System.Collections.Generic;
using System.Data;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs.Extensions.Sql.Telemetry;
Expand All @@ -23,20 +22,17 @@ internal class SqlConverters
internal class SqlConverter : IConverter<SqlAttribute, SqlCommand>
{
private readonly IConfiguration _configuration;
private readonly ILogger _logger;

/// <summary>
/// Initializes a new instance of the <see cref="SqlConverter"/> class.
/// </summary>
/// <param name="configuration"></param>
/// <param name="logger">ILogger used to log information and warnings</param>
/// <exception cref="ArgumentNullException">
/// Thrown if the configuration is null
/// </exception>
public SqlConverter(IConfiguration configuration, ILogger logger)
public SqlConverter(IConfiguration configuration)
{
this._configuration = configuration ?? throw new ArgumentNullException(nameof(configuration));
this._logger = logger;
TelemetryInstance.TrackCreate(CreateType.SqlConverter);
}

Expand All @@ -51,14 +47,10 @@ public SqlConverter(IConfiguration configuration, ILogger logger)
public SqlCommand Convert(SqlAttribute attribute)
{
TelemetryInstance.TrackConvert(ConvertType.SqlCommand);
this._logger.LogDebugWithThreadId("BEGIN Convert (SqlCommand)");
var sw = Stopwatch.StartNew();
try
{
SqlCommand command = SqlBindingUtilities.BuildCommand(attribute, SqlBindingUtilities.BuildConnection(
return SqlBindingUtilities.BuildCommand(attribute, SqlBindingUtilities.BuildConnection(
attribute.ConnectionStringSetting, this._configuration));
this._logger.LogDebugWithThreadId($"END Convert (SqlCommand) Duration={sw.ElapsedMilliseconds}ms");
return command;
}
catch (Exception ex)
{
Expand Down Expand Up @@ -107,14 +99,10 @@ public SqlGenericsConverter(IConfiguration configuration, ILogger logger)
/// <returns>An IEnumerable containing the rows read from the user's database in the form of the user-defined POCO</returns>
public async Task<IEnumerable<T>> ConvertAsync(SqlAttribute attribute, CancellationToken cancellationToken)
{
this._logger.LogDebugWithThreadId("BEGIN ConvertAsync (IEnumerable)");
var sw = Stopwatch.StartNew();
try
{
string json = await this.BuildItemFromAttributeAsync(attribute, ConvertType.IEnumerable);
IEnumerable<T> result = Utils.JsonDeserializeObject<IEnumerable<T>>(json);
this._logger.LogDebugWithThreadId($"END ConvertAsync (IEnumerable) Duration={sw.ElapsedMilliseconds}ms");
return result;
return Utils.JsonDeserializeObject<IEnumerable<T>>(json);
}
catch (Exception ex)
{
Expand All @@ -141,13 +129,9 @@ public async Task<IEnumerable<T>> ConvertAsync(SqlAttribute attribute, Cancellat
/// </returns>
async Task<string> IAsyncConverter<SqlAttribute, string>.ConvertAsync(SqlAttribute attribute, CancellationToken cancellationToken)
{
this._logger.LogDebugWithThreadId("BEGIN ConvertAsync (Json)");
var sw = Stopwatch.StartNew();
try
{
string result = await this.BuildItemFromAttributeAsync(attribute, ConvertType.Json);
this._logger.LogDebugWithThreadId($"END ConvertAsync (Json) Duration={sw.ElapsedMilliseconds}ms");
return result;
return await this.BuildItemFromAttributeAsync(attribute, ConvertType.Json);
}
catch (Exception ex)
{
Expand Down Expand Up @@ -180,9 +164,7 @@ public virtual async Task<string> BuildItemFromAttributeAsync(SqlAttribute attri
using (SqlCommand command = SqlBindingUtilities.BuildCommand(attribute, connection))
{
adapter.SelectCommand = command;
this._logger.LogDebugWithThreadId("BEGIN OpenBuildItemFromAttributeAsyncConnection");
await connection.OpenAsyncWithSqlErrorHandling(CancellationToken.None);
this._logger.LogDebugWithThreadId("END OpenBuildItemFromAttributeAsyncConnection");
this._serverProperties = await SqlBindingUtilities.GetServerTelemetryProperties(connection, this._logger, CancellationToken.None);
Dictionary<TelemetryPropertyName, string> props = connection.AsConnectionProps(this._serverProperties);
TelemetryInstance.TrackConvert(type, props);
Expand Down
Loading