diff --git a/src/Microsoft.ML.Data/Transforms/RowShufflingTransformer.cs b/src/Microsoft.ML.Data/Transforms/RowShufflingTransformer.cs index c6eb4f9483..265783f137 100644 --- a/src/Microsoft.ML.Data/Transforms/RowShufflingTransformer.cs +++ b/src/Microsoft.ML.Data/Transforms/RowShufflingTransformer.cs @@ -553,11 +553,24 @@ protected override void Dispose(bool disposing) { if (_disposed) return; - if (disposing && _producerTask.Status == TaskStatus.Running) + + if (disposing) { - _toProduce.Post(0); + _toProduce.Complete(); _producerTask.Wait(); + + // Complete the consumer after the producerTask has finished, since producerTask could + // have posted more items to _toConsume. + _toConsume.Complete(); + + // Drain both BufferBlocks - this prevents what appears to be memory leaks when using the VS Debugger + // because if a BufferBlock still contains items, its underlying Tasks are not getting completed. + // See https://github.com/dotnet/corefx/issues/30582 for the VS Debugger issue. + // See also https://github.com/dotnet/machinelearning/issues/4399 + _toProduce.TryReceiveAll(out _); + _toConsume.TryReceiveAll(out _); } + _disposed = true; base.Dispose(disposing); } @@ -578,15 +591,16 @@ private async Task LoopProducerWorker() try { int circularIndex = 0; - for (; ; ) + while (await _toProduce.OutputAvailableAsync().ConfigureAwait(false)) { - int requested = await _toProduce.ReceiveAsync(); - if (requested == 0) + int requested; + if (!_toProduce.TryReceive(out requested)) { - // We had some sort of early exit. Just go out, do not post even the - // sentinel to the consumer, as nothing will be consumed any more. - return; + // OutputAvailableAsync returned true, but TryReceive returned false - + // so loop back around and try again. + continue; } + Ch.Assert(requested >= _blockSize); int numRows; for (numRows = 0; numRows < requested; ++numRows)