Skip to content
This repository has been archived by the owner on Nov 20, 2018. It is now read-only.

Commit

Permalink
#553 Use System.Buffers for temporary arrays
Browse files Browse the repository at this point in the history
  • Loading branch information
Tratcher committed Mar 28, 2016
1 parent 1b71748 commit bd60507
Show file tree
Hide file tree
Showing 11 changed files with 210 additions and 113 deletions.
3 changes: 2 additions & 1 deletion src/Microsoft.AspNetCore.Http/project.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
"Microsoft.AspNetCore.Http.Abstractions": "1.0.0-*",
"Microsoft.AspNetCore.WebUtilities": "1.0.0-*",
"Microsoft.Extensions.ObjectPool": "1.0.0-*",
"Microsoft.Net.Http.Headers": "1.0.0-*"
"Microsoft.Net.Http.Headers": "1.0.0-*",
"System.Buffers": "4.0.0-*"
},
"frameworks": {
"net451": { },
Expand Down
17 changes: 13 additions & 4 deletions src/Microsoft.AspNetCore.Owin/WebSockets/OwinWebSocketAdapter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System;
using System.Buffers;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -33,6 +34,7 @@ namespace Microsoft.AspNetCore.Owin

public class OwinWebSocketAdapter : WebSocket
{
private const int _rentedBufferSize = 1024;
private IDictionary<string, object> _websocketContext;
private WebSocketSendAsync _sendAsync;
private WebSocketReceiveAsync _receiveAsync;
Expand Down Expand Up @@ -126,11 +128,18 @@ public override async Task CloseAsync(WebSocketCloseStatus closeStatus, string s
await CloseOutputAsync(closeStatus, statusDescription, cancellationToken);
}

byte[] buffer = new byte[1024];
while (State == WebSocketState.CloseSent)
var buffer = ArrayPool<byte>.Shared.Rent(_rentedBufferSize);
try
{
// Drain until close received
await ReceiveAsync(new ArraySegment<byte>(buffer), cancellationToken);
while (State == WebSocketState.CloseSent)
{
// Drain until close received
await ReceiveAsync(new ArraySegment<byte>(buffer), cancellationToken);
}
}
finally
{
ArrayPool<byte>.Shared.Return(buffer);
}
}

Expand Down
21 changes: 17 additions & 4 deletions src/Microsoft.AspNetCore.WebUtilities/BufferedReadStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System;
using System.Buffers;
using System.IO;
using System.Text;
using System.Threading;
Expand All @@ -16,19 +17,26 @@ internal class BufferedReadStream : Stream

private readonly Stream _inner;
private readonly byte[] _buffer;
private readonly ArrayPool<byte> _bytePool;
private int _bufferOffset = 0;
private int _bufferCount = 0;
private bool _disposed;

public BufferedReadStream(Stream inner, int bufferSize)
: this(inner, bufferSize, ArrayPool<byte>.Shared)
{
}

public BufferedReadStream(Stream inner, int bufferSize, ArrayPool<byte> bytePool)
{
if (inner == null)
{
throw new ArgumentNullException(nameof(inner));
}

_inner = inner;
_buffer = new byte[bufferSize];
_bytePool = bytePool;
_buffer = bytePool.Rent(bufferSize);
}

public ArraySegment<byte> BufferedData
Expand Down Expand Up @@ -128,10 +136,15 @@ public override void SetLength(long value)

protected override void Dispose(bool disposing)
{
_disposed = true;
if (disposing)
if (!_disposed)
{
_inner.Dispose();
_disposed = true;
_bytePool.Return(_buffer);

if (disposing)
{
_inner.Dispose();
}
}
}

Expand Down
71 changes: 61 additions & 10 deletions src/Microsoft.AspNetCore.WebUtilities/FileBufferingReadStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System;
using System.Buffers;
using System.Diagnostics;
using System.IO;
using System.Threading;
Expand All @@ -16,12 +17,15 @@ namespace Microsoft.AspNetCore.WebUtilities
/// </summary>
public class FileBufferingReadStream : Stream
{
private const int _maxRentedBufferSize = 1024 * 1024; // 1MB
private readonly Stream _inner;
private readonly ArrayPool<byte> _bytePool;
private readonly int _memoryThreshold;
private string _tempFileDirectory;
private readonly Func<string> _tempFileDirectoryAccessor;

private Stream _buffer = new MemoryStream(); // TODO: We could have a more efficiently expanding buffer stream.
private Stream _buffer;
private byte[] _rentedBuffer;
private bool _inMemory = true;
private bool _completelyBuffered;

Expand All @@ -32,6 +36,15 @@ public FileBufferingReadStream(
Stream inner,
int memoryThreshold,
Func<string> tempFileDirectoryAccessor)
: this(inner, memoryThreshold, tempFileDirectoryAccessor, ArrayPool<byte>.Shared)
{
}

public FileBufferingReadStream(
Stream inner,
int memoryThreshold,
Func<string> tempFileDirectoryAccessor,
ArrayPool<byte> bytePool)
{
if (inner == null)
{
Expand All @@ -43,13 +56,34 @@ public FileBufferingReadStream(
throw new ArgumentNullException(nameof(tempFileDirectoryAccessor));
}

_bytePool = bytePool;
if (memoryThreshold < _maxRentedBufferSize)
{
_rentedBuffer = bytePool.Rent(memoryThreshold);
_buffer = new MemoryStream(_rentedBuffer);
_buffer.SetLength(0);
}
else
{
_buffer = new MemoryStream();
}

_inner = inner;
_memoryThreshold = memoryThreshold;
_tempFileDirectoryAccessor = tempFileDirectoryAccessor;
}

// TODO: allow for an optional buffer size limit to prevent filling hard disks. 1gb?
public FileBufferingReadStream(Stream inner, int memoryThreshold, string tempFileDirectory)
: this(inner, memoryThreshold, tempFileDirectory, ArrayPool<byte>.Shared)
{
}

public FileBufferingReadStream(
Stream inner,
int memoryThreshold,
string tempFileDirectory,
ArrayPool<byte> bytePool)
{
if (inner == null)
{
Expand All @@ -61,6 +95,18 @@ public FileBufferingReadStream(Stream inner, int memoryThreshold, string tempFil
throw new ArgumentNullException(nameof(tempFileDirectory));
}

_bytePool = bytePool;
if (memoryThreshold < _maxRentedBufferSize)
{
_rentedBuffer = bytePool.Rent(memoryThreshold);
_buffer = new MemoryStream(_rentedBuffer);
_buffer.SetLength(0);
}
else
{
_buffer = new MemoryStream();
}

_inner = inner;
_memoryThreshold = memoryThreshold;
_tempFileDirectory = tempFileDirectory;
Expand Down Expand Up @@ -145,11 +191,11 @@ public override int Read(byte[] buffer, int offset, int count)

if (_inMemory && _buffer.Length + read > _memoryThreshold)
{
var oldBuffer = _buffer;
_buffer = CreateTempFile();
_inMemory = false;
oldBuffer.Position = 0;
oldBuffer.CopyTo(_buffer, 1024 * 16);
_buffer = CreateTempFile();
_buffer.Write(_rentedBuffer, 0, (int)_buffer.Length);
_bytePool.Return(_rentedBuffer);
_rentedBuffer = null;
}

if (read > 0)
Expand Down Expand Up @@ -216,11 +262,11 @@ public override async Task<int> ReadAsync(byte[] buffer, int offset, int count,

if (_inMemory && _buffer.Length + read > _memoryThreshold)
{
var oldBuffer = _buffer;
_buffer = CreateTempFile();
_inMemory = false;
oldBuffer.Position = 0;
await oldBuffer.CopyToAsync(_buffer, 1024 * 16, cancellationToken);
_buffer = CreateTempFile();
await _buffer.WriteAsync(_rentedBuffer, 0, (int)_buffer.Length, cancellationToken);
_bytePool.Return(_rentedBuffer);
_rentedBuffer = null;
}

if (read > 0)
Expand Down Expand Up @@ -270,6 +316,11 @@ protected override void Dispose(bool disposing)
if (!_disposed)
{
_disposed = true;
if (_rentedBuffer != null)
{
_bytePool.Return(_rentedBuffer);
}

if (disposing)
{
_buffer.Dispose();
Expand All @@ -285,4 +336,4 @@ private void ThrowIfDisposed()
}
}
}
}
}
67 changes: 48 additions & 19 deletions src/Microsoft.AspNetCore.WebUtilities/FormReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System;
using System.Buffers;
using System.Collections.Generic;
using System.IO;
using System.Text;
Expand All @@ -14,25 +15,40 @@ namespace Microsoft.AspNetCore.WebUtilities
/// <summary>
/// Used to read an 'application/x-www-form-urlencoded' form.
/// </summary>
public class FormReader
public class FormReader : IDisposable
{
private const int _rentedCharPoolLength = 8192;
private readonly TextReader _reader;
private readonly char[] _buffer = new char[1024];
private readonly char[] _buffer;
private readonly ArrayPool<char> _charPool;
private readonly StringBuilder _builder = new StringBuilder();
private int _bufferOffset;
private int _bufferCount;
private bool _disposed;

public FormReader(string data)
: this(data, ArrayPool<char>.Shared)
{
}

public FormReader(string data, ArrayPool<char> charPool)
{
if (data == null)
{
throw new ArgumentNullException(nameof(data));
}

_buffer = charPool.Rent(_rentedCharPoolLength);
_charPool = charPool;
_reader = new StringReader(data);
}

public FormReader(Stream stream, Encoding encoding)
: this(stream, encoding, ArrayPool<char>.Shared)
{
}

public FormReader(Stream stream, Encoding encoding, ArrayPool<char> charPool)
{
if (stream == null)
{
Expand All @@ -44,6 +60,8 @@ public FormReader(Stream stream, Encoding encoding)
throw new ArgumentNullException(nameof(encoding));
}

_buffer = charPool.Rent(_rentedCharPoolLength);
_charPool = charPool;
_reader = new StreamReader(stream, encoding, detectEncodingFromByteOrderMarks: true, bufferSize: 1024 * 2, leaveOpen: true);
}

Expand Down Expand Up @@ -167,17 +185,18 @@ private async Task BufferAsync(CancellationToken cancellationToken)
/// <returns>The collection containing the parsed HTTP form body.</returns>
public static Dictionary<string, StringValues> ReadForm(string text)
{
var reader = new FormReader(text);

var accumulator = new KeyValueAccumulator();
var pair = reader.ReadNextPair();
while (pair.HasValue)
using (var reader = new FormReader(text))
{
accumulator.Append(pair.Value.Key, pair.Value.Value);
pair = reader.ReadNextPair();
}
var accumulator = new KeyValueAccumulator();
var pair = reader.ReadNextPair();
while (pair.HasValue)
{
accumulator.Append(pair.Value.Key, pair.Value.Value);
pair = reader.ReadNextPair();
}

return accumulator.GetResults();
return accumulator.GetResults();
}
}

/// <summary>
Expand All @@ -200,17 +219,27 @@ public static Dictionary<string, StringValues> ReadForm(string text)
/// <returns>The collection containing the parsed HTTP form body.</returns>
public static async Task<Dictionary<string, StringValues>> ReadFormAsync(Stream stream, Encoding encoding, CancellationToken cancellationToken = new CancellationToken())
{
var reader = new FormReader(stream, encoding);

var accumulator = new KeyValueAccumulator();
var pair = await reader.ReadNextPairAsync(cancellationToken);
while (pair.HasValue)
using (var reader = new FormReader(stream, encoding))
{
accumulator.Append(pair.Value.Key, pair.Value.Value);
pair = await reader.ReadNextPairAsync(cancellationToken);
var accumulator = new KeyValueAccumulator();
var pair = await reader.ReadNextPairAsync(cancellationToken);
while (pair.HasValue)
{
accumulator.Append(pair.Value.Key, pair.Value.Value);
pair = await reader.ReadNextPairAsync(cancellationToken);
}

return accumulator.GetResults();
}
}

return accumulator.GetResults();
public void Dispose()
{
if (!_disposed)
{
_disposed = true;
_charPool.Return(_buffer);
}
}
}
}
Loading

0 comments on commit bd60507

Please sign in to comment.