Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 22 additions & 8 deletions src/Microsoft.ML.Data/Transforms/RowShufflingTransformer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -553,11 +553,24 @@ protected override void Dispose(bool disposing)
{
if (_disposed)
return;
if (disposing && _producerTask.Status == TaskStatus.Running)

if (disposing)
{
_toProduce.Post(0);
_toProduce.Complete();
_producerTask.Wait();

// Complete the consumer after the producerTask has finished, since producerTask could
// have posted more items to _toConsume.
_toConsume.Complete();

// Drain both BufferBlocks - this prevents what appears to be memory leaks when using the VS Debugger
// because if a BufferBlock still contains items, its underlying Tasks are not getting completed.
// See https://github.com/dotnet/corefx/issues/30582 for the VS Debugger issue.
// See also https://github.com/dotnet/machinelearning/issues/4399
_toProduce.TryReceiveAll(out _);
_toConsume.TryReceiveAll(out _);
}

_disposed = true;
base.Dispose(disposing);
}
Expand All @@ -578,15 +591,16 @@ private async Task LoopProducerWorker()
try
{
int circularIndex = 0;
for (; ; )
while (await _toProduce.OutputAvailableAsync().ConfigureAwait(false))
{
int requested = await _toProduce.ReceiveAsync();
if (requested == 0)
int requested;
if (!_toProduce.TryReceive(out requested))
{
// We had some sort of early exit. Just go out, do not post even the
// sentinel to the consumer, as nothing will be consumed any more.
return;
// OutputAvailableAsync returned true, but TryReceive returned false -
// so loop back around and try again.
continue;
}

Ch.Assert(requested >= _blockSize);
int numRows;
for (numRows = 0; numRows < requested; ++numRows)
Expand Down