using System; using System.Collections.Generic; using System.Diagnostics; using System.Threading; using System.Threading.Tasks; using Microsoft.Data.SqlClient; namespace ConsoleApp1 { class Program { private static string _connectionString; private static int _successes; private static int _networkErrors; private static int _invalidResults; private static int _missingResults; private static int _tracing; private static int _normal; static async Task Main(string[] args) { _tracing = 0; AppContext.SetSwitch("Switch.Microsoft.Data.SqlClient.UseManagedNetworkingOnWindows", true); _connectionString = @"Data Source=(local);Initial Catalog=Scratch;Trusted_Connection=true;Connect Timeout=1;Connect Timeout=3;ConnectRetryCount=1;"; SqlConnectionStringBuilder builder = new SqlConnectionStringBuilder(_connectionString); builder.MultipleActiveResultSets = true; _connectionString = builder.ToString(); using (var cts = new CancellationTokenSource()) { Console.CancelKeyPress += new ConsoleCancelEventHandler(CancelExecution); JetBrains.Profiler.Api.MeasureProfiler.StartCollectingData(); await ExecuteTestAsync(cts.Token); JetBrains.Profiler.Api.MeasureProfiler.StopCollectingData(); JetBrains.Profiler.Api.MeasureProfiler.SaveData(); Console.WriteLine("Done!"); } //PlainCancel(_connectionString); } private static void PlainCancel(string connString) { using (SqlConnection conn = new SqlConnection(connString)) using (SqlCommand cmd = new SqlCommand("select * from JobResults; waitfor delay '00:00:10'; select * from JobResults", conn)) { conn.Open(); using (SqlDataReader reader = cmd.ExecuteReader()) { try { cmd.Cancel(); do { while (reader.Read()) { } } while (reader.NextResult()); } catch (SqlException ex) when (ex.Message.StartsWith("A severe error occurred on the current command. The results, if any, should be discarded.")) { Console.WriteLine("worked"); } } } } private static async Task ExecuteTestAsync(CancellationToken cancellationToken) { using var monitorCts = new CancellationTokenSource(); var monitorTask = MonitorAsync(monitorCts.Token); ThreadPool.GetMaxThreads(out int _, out int ioThreads); ThreadPool.SetMinThreads(4000, ioThreads); var tasks = new Task[4000]; Console.WriteLine($"Starting {tasks.Length} tasks..."); for (var i = 0; i < tasks.Length; i++) { tasks[i] = ExecuteLoopAsync(i, cancellationToken); } await Task.WhenAll(tasks); monitorCts.Cancel(); await monitorTask; } private static async Task ExecuteLoopAsync(int id, CancellationToken cancellationToken) { await Task.Yield(); HashSet successful = new HashSet(); while (!cancellationToken.IsCancellationRequested) { //try //{ var (result,spid,exception) = await ExecuteTransactionAsync(id); if (exception != null) { Interlocked.Increment(ref _networkErrors); if (spid != 0) { Console.WriteLine($"{spid}, {id}, {exception.GetType()}, {exception.Message.Substring(0,20)}"); } } else if (!result.HasValue) { Interlocked.Increment(ref _missingResults); } else if (result != id) { Interlocked.Increment(ref _invalidResults); successful.Remove(spid); if (_tracing == 0) { _tracing = spid; Environment.SetEnvironmentVariable("_TRACING", _tracing.ToString()); } else if (spid == _tracing) { Console.WriteLine($"{spid}, {id}, {result}"); } } else { Interlocked.Increment(ref _successes); if (_tracing != 0 && _normal==0 && successful.Contains(spid)) { _normal = spid; Environment.SetEnvironmentVariable("_NORMAL", spid.ToString()); } successful.Add(spid); } //} //catch (Exception exception) when (exception is SqlException || exception is InvalidOperationException) //{ // Interlocked.Increment(ref _networkErrors); //} } } private static async Task<(int?,int,Exception)> ExecuteTransactionAsync(int id) { int? result = null; int spid = 0; Exception exception = null; try { await using var connection = new SqlConnection(_connectionString); await connection.OpenAsync(); spid = connection.ServerProcessId; await using var tx = connection.BeginTransaction(System.Data.IsolationLevel.ReadCommitted); await using var command = new SqlCommand(@"select @Id as Id", connection, tx); command.CommandTimeout = 1; command.Parameters.AddWithValue("Id", id); //if (_tracing != 0 && connection.ServerProcessId == _tracing) //{ // Debugger.Break(); //} using (var reader = await command.ExecuteReaderAsync()) { if (await reader.ReadAsync()) { result = reader.GetInt32(0); } } await tx.CommitAsync(); } catch (Exception e) when (e is SqlException || e is InvalidOperationException) { exception = e; } return (result, spid, exception); } private static async Task MonitorAsync(CancellationToken cancellationToken) { while (!cancellationToken.IsCancellationRequested) { await Task.Delay(TimeSpan.FromSeconds(5)); var successes = _successes; var networkErrors = _networkErrors; var invalidResults = _invalidResults; var missingResults = _missingResults; int total = successes + networkErrors + invalidResults + missingResults; Console.WriteLine($"Processed: {total,6} - Network errors: {networkErrors,6} - Missing: {missingResults,6} - Invalid: {invalidResults,6}"); } } private static void CancelExecution(object sender, ConsoleCancelEventArgs args) { // Set the Cancel property to true to prevent the process from terminating. args.Cancel = true; Console.WriteLine("Requested cancellation.."); JetBrains.Profiler.Api.MeasureProfiler.StopCollectingData(); JetBrains.Profiler.Api.MeasureProfiler.SaveData(); } } }