Skip to content
This repository was archived by the owner on Jan 23, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/System.Data.SqlClient/src/System.Data.SqlClient.csproj
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
<Project Sdk="Microsoft.NET.Sdk">
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<ProjectGuid>{D4550556-4745-457F-BA8F-3EBF3836D6B4}</ProjectGuid>
<AssemblyName>System.Data.SqlClient</AssemblyName>
Expand Down
225 changes: 113 additions & 112 deletions src/System.Data.SqlClient/src/System/Data/SqlClient/SqlBulkCopy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1841,7 +1841,10 @@ private Task WriteRowSourceToServerAsync(int columnCount, CancellationToken ctok
}
else
{
AsyncHelper.ContinueTask(writeTask, tcs, () => tcs.SetResult(null));
AsyncHelper.ContinueTaskWithState(writeTask, tcs,
state: tcs,
onSuccess: state => ((TaskCompletionSource<object>)state).SetResult(null)
);
}
}, ctoken); // We do not need to propagate exception, etc, from reconnect task, we just need to wait for it to finish.
return tcs.Task;
Expand Down Expand Up @@ -2153,17 +2156,17 @@ private Task CopyColumnsAsync(int col, TaskCompletionSource<object> source = nul
private void CopyColumnsAsyncSetupContinuation(TaskCompletionSource<object> source, Task task, int i)
{
AsyncHelper.ContinueTask(task, source, () =>
{
if (i + 1 < _sortedColumnMappings.Count)
{
CopyColumnsAsync(i + 1, source); //continue from the next column
}
else
{
source.SetResult(null);
if (i + 1 < _sortedColumnMappings.Count)
{
CopyColumnsAsync(i + 1, source); //continue from the next column
}
else
{
source.SetResult(null);
}
}
},
_connection.GetOpenTdsConnection());
);
}

// The notification logic.
Expand Down Expand Up @@ -2257,24 +2260,6 @@ private Task CheckForCancellation(CancellationToken cts, TaskCompletionSource<ob
}
}

private TaskCompletionSource<object> ContinueTaskPend(Task task, TaskCompletionSource<object> source, Func<TaskCompletionSource<object>> action)
{
if (task == null)
{
return action();
}
else
{
Debug.Assert(source != null, "source should already be initialized if task is not null");
AsyncHelper.ContinueTask(task, source, () =>
{
TaskCompletionSource<object> newSource = action();
Debug.Assert(newSource == null, "Shouldn't create a new source when one already exists");
});
}
return null;
}

// Copies all the rows in a batch.
// Maintains state machine with state variable: rowSoFar.
// Returned Task could be null in two cases: (1) _isAsyncBulkCopy == false, or (2) _isAsyncBulkCopy == true but all async writes finished synchronously.
Expand Down Expand Up @@ -2315,7 +2300,7 @@ private Task CopyRowsAsync(int rowsSoFar, int totalRows, CancellationToken cts,
}
resultTask = source.Task;

AsyncHelper.ContinueTask(readTask, source, () => CopyRowsAsync(i + 1, totalRows, cts, source), connectionToDoom: _connection.GetOpenTdsConnection());
AsyncHelper.ContinueTask(readTask, source, () => CopyRowsAsync(i + 1, totalRows, cts, source));
return resultTask; // Associated task will be completed when all rows are copied to server/exception/cancelled.
}
}
Expand All @@ -2325,19 +2310,20 @@ private Task CopyRowsAsync(int rowsSoFar, int totalRows, CancellationToken cts,
resultTask = source.Task;

AsyncHelper.ContinueTask(task, source, onSuccess: () =>
{
CheckAndRaiseNotification(); // Check for notification now as the current row copy is done at this moment.

Task readTask = ReadFromRowSourceAsync(cts);
if (readTask == null)
{
CopyRowsAsync(i + 1, totalRows, cts, source);
}
else
{
AsyncHelper.ContinueTask(readTask, source, onSuccess: () => CopyRowsAsync(i + 1, totalRows, cts, source), connectionToDoom: _connection.GetOpenTdsConnection());
CheckAndRaiseNotification(); // Check for notification now as the current row copy is done at this moment.

Task readTask = ReadFromRowSourceAsync(cts);
if (readTask == null)
{
CopyRowsAsync(i + 1, totalRows, cts, source);
}
else
{
AsyncHelper.ContinueTask(readTask, source, onSuccess: () => CopyRowsAsync(i + 1, totalRows, cts, source));
}
}
}, connectionToDoom: _connection.GetOpenTdsConnection());
);
return resultTask;
}
}
Expand Down Expand Up @@ -2406,15 +2392,17 @@ private Task CopyBatchesAsync(BulkCopySimpleResultSet internalResults, string up
source = new TaskCompletionSource<object>();
}

AsyncHelper.ContinueTask(commandTask, source, () =>
{
Task continuedTask = CopyBatchesAsyncContinued(internalResults, updateBulkCommandText, cts, source);
if (continuedTask == null)
AsyncHelper.ContinueTask(commandTask, source,
() =>
{
// Continuation finished sync, recall into CopyBatchesAsync to continue
CopyBatchesAsync(internalResults, updateBulkCommandText, cts, source);
Task continuedTask = CopyBatchesAsyncContinued(internalResults, updateBulkCommandText, cts, source);
if (continuedTask == null)
{
// Continuation finished sync, recall into CopyBatchesAsync to continue
CopyBatchesAsync(internalResults, updateBulkCommandText, cts, source);
}
}
}, _connection.GetOpenTdsConnection());
);
return source.Task;
}
}
Expand Down Expand Up @@ -2462,15 +2450,19 @@ private Task CopyBatchesAsyncContinued(BulkCopySimpleResultSet internalResults,
{ // First time only
source = new TaskCompletionSource<object>();
}
AsyncHelper.ContinueTask(task, source, () =>
{
Task continuedTask = CopyBatchesAsyncContinuedOnSuccess(internalResults, updateBulkCommandText, cts, source);
if (continuedTask == null)
AsyncHelper.ContinueTask(task, source,
onSuccess: () =>
{
// Continuation finished sync, recall into CopyBatchesAsync to continue
CopyBatchesAsync(internalResults, updateBulkCommandText, cts, source);
}
}, _connection.GetOpenTdsConnection(), _ => CopyBatchesAsyncContinuedOnError(cleanupParser: false), () => CopyBatchesAsyncContinuedOnError(cleanupParser: true));
Task continuedTask = CopyBatchesAsyncContinuedOnSuccess(internalResults, updateBulkCommandText, cts, source);
if (continuedTask == null)
{
// Continuation finished sync, recall into CopyBatchesAsync to continue
CopyBatchesAsync(internalResults, updateBulkCommandText, cts, source);
}
},
onFailure: (_) => CopyBatchesAsyncContinuedOnError(cleanupParser: false),
onCancellation: () => CopyBatchesAsyncContinuedOnError(cleanupParser: true)
);

return source.Task;
}
Expand Down Expand Up @@ -2517,22 +2509,25 @@ private Task CopyBatchesAsyncContinuedOnSuccess(BulkCopySimpleResultSet internal
source = new TaskCompletionSource<object>();
}

AsyncHelper.ContinueTask(writeTask, source, () =>
{
try
{
RunParser();
CommitTransaction();
}
catch (Exception)
AsyncHelper.ContinueTask(writeTask, source,
onSuccess: () =>
{
CopyBatchesAsyncContinuedOnError(cleanupParser: false);
throw;
}
try
{
RunParser();
CommitTransaction();
}
catch (Exception)
{
CopyBatchesAsyncContinuedOnError(cleanupParser: false);
throw;
}

// Always call back into CopyBatchesAsync
CopyBatchesAsync(internalResults, updateBulkCommandText, cts, source);
}, connectionToDoom: _connection.GetOpenTdsConnection(), onFailure: _ => CopyBatchesAsyncContinuedOnError(cleanupParser: false));
// Always call back into CopyBatchesAsync
CopyBatchesAsync(internalResults, updateBulkCommandText, cts, source);
},
onFailure: (_) => CopyBatchesAsyncContinuedOnError(cleanupParser: false)
);
return source.Task;
}
}
Expand Down Expand Up @@ -2651,48 +2646,50 @@ private void WriteToServerInternalRestContinuedAsync(BulkCopySimpleResultSet int
{
source = new TaskCompletionSource<object>();
}
AsyncHelper.ContinueTask(task, source, () =>
{
// Bulk copy task is completed at this moment.
if (task.IsCanceled)
AsyncHelper.ContinueTask(task, source,
() =>
{
_localColumnMappings = null;
try
// Bulk copy task is completed at this moment.
if (task.IsCanceled)
{
CleanUpStateObjectOnError();
_localColumnMappings = null;
try
{
CleanUpStateObjectOnError();
}
finally
{
source.SetCanceled();
}
}
finally
else if (task.Exception != null)
{
source.SetCanceled();
source.SetException(task.Exception.InnerException);
}
}
else if (task.Exception != null)
{
source.SetException(task.Exception.InnerException);
}
else
{
_localColumnMappings = null;
try
{
CleanUpStateObjectOnError();
}
finally
else
{
if (source != null)
_localColumnMappings = null;
try
{
if (cts.IsCancellationRequested)
{ // We may get cancellation req even after the entire copy.
source.SetCanceled();
}
else
CleanUpStateObjectOnError();
}
finally
{
if (source != null)
{
source.SetResult(null);
if (cts.IsCancellationRequested)
{ // We may get cancellation req even after the entire copy.
source.SetCanceled();
}
else
{
source.SetResult(null);
}
}
}
}
}
}, _connection.GetOpenTdsConnection());
);
return;
}
else
Expand Down Expand Up @@ -2782,12 +2779,15 @@ private void WriteToServerInternalRestAsync(CancellationToken cts, TaskCompletio
{
regReconnectCancel = cts.Register(s => ((TaskCompletionSource<object>)s).TrySetCanceled(), cancellableReconnectTS);
}
AsyncHelper.ContinueTask(reconnectTask, cancellableReconnectTS, () => { cancellableReconnectTS.SetResult(null); });
AsyncHelper.ContinueTaskWithState(reconnectTask, cancellableReconnectTS,
state: cancellableReconnectTS,
onSuccess: (state) => { ((TaskCompletionSource<object>)state).SetResult(null); }
);
// No need to cancel timer since SqlBulkCopy creates specific task source for reconnection.
AsyncHelper.SetTimeoutException(cancellableReconnectTS, BulkCopyTimeout,
() => { return SQL.BulkLoadInvalidDestinationTable(_destinationTableName, SQL.CR_ReconnectTimeout()); }, CancellationToken.None);
AsyncHelper.ContinueTask(cancellableReconnectTS.Task, source,
() =>
onSuccess: () =>
{
regReconnectCancel.Dispose();
if (_parserLock != null)
Expand All @@ -2799,7 +2799,6 @@ private void WriteToServerInternalRestAsync(CancellationToken cts, TaskCompletio
_parserLock.Wait(canReleaseFromAnyThread: true);
WriteToServerInternalRestAsync(cts, source);
},
connectionToAbort: _connection,
onFailure: (e) => { regReconnectCancel.Dispose(); },
onCancellation: () => { regReconnectCancel.Dispose(); },
exceptionConverter: (ex) => SQL.BulkLoadInvalidDestinationTable(_destinationTableName, ex));
Expand Down Expand Up @@ -2850,7 +2849,7 @@ private void WriteToServerInternalRestAsync(CancellationToken cts, TaskCompletio

if (internalResultsTask != null)
{
AsyncHelper.ContinueTask(internalResultsTask, source, () => WriteToServerInternalRestContinuedAsync(internalResultsTask.Result, cts, source), _connection.GetOpenTdsConnection());
AsyncHelper.ContinueTask(internalResultsTask, source, () => WriteToServerInternalRestContinuedAsync(internalResultsTask.Result, cts, source));
}
else
{
Expand Down Expand Up @@ -2921,17 +2920,19 @@ private Task WriteToServerInternalAsync(CancellationToken ctoken)
else
{
Debug.Assert(_isAsyncBulkCopy, "Read must not return a Task in the Sync mode");
AsyncHelper.ContinueTask(readTask, source, () =>
{
if (!_hasMoreRowToCopy)
{
source.SetResult(null); // No rows to copy!
}
else
AsyncHelper.ContinueTask(readTask, source,
() =>
{
WriteToServerInternalRestAsync(ctoken, source); // Passing the same completion which will be completed by the Callee.
if (!_hasMoreRowToCopy)
{
source.SetResult(null); // No rows to copy!
}
else
{
WriteToServerInternalRestAsync(ctoken, source); // Passing the same completion which will be completed by the Callee.
}
}
}, _connection.GetOpenTdsConnection());
);
return resultTask;
}
}
Expand Down
Loading