Skip to content

Commit

Permalink
Merge pull request dotnet/corefx#2929 from stephentoub/filestream_asy…
Browse files Browse the repository at this point in the history
…nc_buffering

Improve FileStream WriteAsync buffering and performance

Commit migrated from dotnet/corefx@6cffbc2
  • Loading branch information
stephentoub committed Aug 26, 2015
2 parents 7c716f7 + 26253f6 commit 326e7d7
Show file tree
Hide file tree
Showing 8 changed files with 669 additions and 179 deletions.
154 changes: 117 additions & 37 deletions src/libraries/System.IO.FileSystem/src/System/IO/Win32FileStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ internal sealed partial class Win32FileStream : FileStreamBase
private long _appendStart;// When appending, prevent overwriting file.

private Task<int> _lastSynchronouslyCompletedTask = null;
private Task _activeBufferOperation = null;

[System.Security.SecuritySafeCritical]
public Win32FileStream(String path, FileMode mode, FileAccess access, FileShare share, int bufferSize, FileOptions options, FileStream parent) : base(parent)
Expand Down Expand Up @@ -331,6 +332,10 @@ private unsafe void VerifyHandleIsSync()
}
}

private bool HasActiveBufferOperation
{
get { return _activeBufferOperation != null && !_activeBufferOperation.IsCompleted; }
}

public override bool CanRead
{
Expand Down Expand Up @@ -520,6 +525,26 @@ private void FlushRead()
_readLen = 0;
}

// Returns a task that flushes the internal write buffer
private Task FlushWriteAsync(CancellationToken cancellationToken)
{
Debug.Assert(_isAsync);
Debug.Assert(_readPos == 0 && _readLen == 0, "FileStream: Read buffer must be empty in FlushWriteAsync!");

// If the buffer is already flushed, don't spin up the OS write
if (_writePos == 0) return Task.CompletedTask;

Task flushTask = WriteInternalCoreAsync(_buffer, 0, _writePos, cancellationToken);
_writePos = 0;

// Update the active buffer operation
_activeBufferOperation = HasActiveBufferOperation ?
Task.WhenAll(_activeBufferOperation, flushTask) :
flushTask;

return flushTask;
}

// Writes are buffered. Anytime the buffer fills up
// (_writePos + delta > _bufferSize) or the buffer switches to reading
// and there is left over data (_writePos > 0), this function must be called.
Expand All @@ -529,7 +554,7 @@ private void FlushWrite(bool calledFromFinalizer)

if (_isAsync)
{
Task writeTask = WriteInternalCoreAsync(_buffer, 0, _writePos, CancellationToken.None);
Task writeTask = FlushWriteAsync(CancellationToken.None);
// With our Whidbey async IO & overlapped support for AD unloads,
// we don't strictly need to block here to release resources
// since that support takes care of the pinning & freeing the
Expand Down Expand Up @@ -1259,52 +1284,107 @@ private Task WriteInternalAsync(byte[] array, int offset, int numBytes, Cancella
if (!_parent.CanWrite) throw __Error.GetWriteNotSupported();

Debug.Assert((_readPos == 0 && _readLen == 0 && _writePos >= 0) || (_writePos == 0 && _readPos <= _readLen), "We're either reading or writing, but not both.");
Debug.Assert(!_isPipe || (_readPos == 0 && _readLen == 0), "Win32FileStream must not have buffered data here! Pipes should be unidirectional.");

if (_isPipe)
bool writeDataStoredInBuffer = false;
if (!_isPipe) // avoid async buffering with pipes, as doing so can lead to deadlocks (see comments in ReadInternalAsyncCore)
{
// Pipes are tricky, at least when you have 2 different pipes
// that you want to use simultaneously. When redirecting stdout
// & stderr with the Process class, it's easy to deadlock your
// parent & child processes when doing writes 4K at a time. The
// OS appears to use a 4K buffer internally. If you write to a
// pipe that is full, you will block until someone read from
// that pipe. If you try reading from an empty pipe and
// Win32FileStream's ReadAsync blocks waiting for data to fill it's
// internal buffer, you will be blocked. In a case where a child
// process writes to stdout & stderr while a parent process tries
// reading from both, you can easily get into a deadlock here.
// To avoid this deadlock, don't buffer when doing async IO on
// pipes.
Debug.Assert(_readPos == 0 && _readLen == 0, "Win32FileStream must not have buffered data here! Pipes should be unidirectional.");
// Ensure the buffer is clear for writing
if (_writePos == 0)
{
if (_readPos < _readLen)
{
FlushRead();
}
_readPos = 0;
_readLen = 0;
}

if (_writePos > 0)
FlushWrite(false);
// Determine how much space remains in the buffer
int remainingBuffer = _bufferSize - _writePos;
Debug.Assert(remainingBuffer >= 0);

return WriteInternalCoreAsync(array, offset, numBytes, cancellationToken);
}
// Simple/common case:
// - The write is smaller than our buffer, such that it's worth considering buffering it.
// - There's no active flush operation, such that we don't have to worry about the existing buffer being in use.
// - And the data we're trying to write fits in the buffer, meaning it wasn't already filled by previous writes.
// In that case, just store it in the buffer.
if (numBytes < _bufferSize && !HasActiveBufferOperation && numBytes <= remainingBuffer)
{
if (_buffer == null)
_buffer = new byte[_bufferSize];

// Handle buffering.
if (_writePos == 0)
{
if (_readPos < _readLen) FlushRead();
_readPos = 0;
_readLen = 0;
Buffer.BlockCopy(array, offset, _buffer, _writePos, numBytes);
_writePos += numBytes;
writeDataStoredInBuffer = true;

// There is one special-but-common case, common because devs often use
// byte[] sizes that are powers of 2 and thus fit nicely into our buffer, which is
// also a power of 2. If after our write the buffer still has remaining space,
// then we're done and can return a completed task now. But if we filled the buffer
// completely, we want to do the asynchronous flush/write as part of this operation
// rather than waiting until the next write that fills the buffer.
if (numBytes != remainingBuffer)
return Task.CompletedTask;

Debug.Assert(_writePos == _bufferSize);
}
}

int n = _bufferSize - _writePos;
if (numBytes <= n)
// At this point, at least one of the following is true:
// 1. There was an active flush operation (it could have completed by now, though).
// 2. The data doesn't fit in the remaining buffer (or it's a pipe and we chose not to try).
// 3. We wrote all of the data to the buffer, filling it.
//
// If there's an active operation, we can't touch the current buffer because it's in use.
// That gives us a choice: we can either allocate a new buffer, or we can skip the buffer
// entirely (even if the data would otherwise fit in it). For now, for simplicity, we do
// the latter; it could also have performance wins due to OS-level optimizations, and we could
// potentially add support for PreAllocatedOverlapped due to having a single buffer. (We can
// switch to allocating a new buffer, potentially experimenting with buffer pooling, should
// performance data suggest it's appropriate.)
//
// If the data doesn't fit in the remaining buffer, it could be because it's so large
// it's greater than the entire buffer size, in which case we'd always skip the buffer,
// or it could be because there's more data than just the space remaining. For the latter
// case, we need to issue an asynchronous write to flush that data, which then turns this into
// the first case above with an active operation.
//
// If we already stored the data, then we have nothing additional to write beyond what
// we need to flush.
//
// In any of these cases, we have the same outcome:
// - If there's data in the buffer, flush it by writing it out asynchronously.
// - Then, if there's any data to be written, issue a write for it concurrently.
// We return a Task that represents one or both.

// Flush the buffer asynchronously if there's anything to flush
Task flushTask = null;
if (_writePos > 0)
{
if (_writePos == 0) _buffer = new byte[_bufferSize];
Buffer.BlockCopy(array, offset, _buffer, _writePos, numBytes);
_writePos += numBytes;

return Task.CompletedTask;
flushTask = FlushWriteAsync(cancellationToken);

// If we already copied all of the data into the buffer,
// simply return the flush task here. Same goes for if the task has
// already completed and was unsuccessful.
if (writeDataStoredInBuffer ||
flushTask.IsFaulted ||
flushTask.IsCanceled)
{
return flushTask;
}
}

if (_writePos > 0)
FlushWrite(false);
Debug.Assert(!writeDataStoredInBuffer);
Debug.Assert(_writePos == 0);

return WriteInternalCoreAsync(array, offset, numBytes, cancellationToken);
// Finally, issue the write asynchronously, and return a Task that logically
// represents the write operation, including any flushing done.
Task writeTask = WriteInternalCoreAsync(array, offset, numBytes, cancellationToken);
return
(flushTask == null || flushTask.Status == TaskStatus.RanToCompletion) ? writeTask :
(writeTask.Status == TaskStatus.RanToCompletion) ? flushTask :
Task.WhenAll(flushTask, writeTask);
}

[System.Security.SecuritySafeCritical] // auto-generated
Expand Down Expand Up @@ -1395,7 +1475,7 @@ private unsafe Task WriteInternalCoreAsync(byte[] bytes, int offset, int numByte
throw Win32Marshal.GetExceptionForWin32Error(errorCode);
}
}
else
else // ERROR_IO_PENDING
{
// Only once the IO is pending do we register for cancellation
completionSource.RegisterForCancellation();
Expand Down

This file was deleted.

Loading

0 comments on commit 326e7d7

Please sign in to comment.