Skip to content
This repository has been archived by the owner on Aug 2, 2023. It is now read-only.

Commit

Permalink
Grow the pooled buffer exponentially (double) rather than linearly. (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ahsonkhan authored Oct 25, 2018
1 parent c07fe64 commit fe117a6
Show file tree
Hide file tree
Showing 3 changed files with 224 additions and 95 deletions.
153 changes: 115 additions & 38 deletions src/System.Text.JsonLab/System/Text/Json/Utf8JsonReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System.Buffers.Text;
using System.Collections.Generic;
using System.Diagnostics;

using static System.Text.JsonLab.JsonThrowHelper;

namespace System.Text.JsonLab
Expand All @@ -15,6 +16,8 @@ public ref partial struct Utf8JsonReader
// We are using a ulong to represent our nested state, so we can only go 64 levels deep.
internal const int StackFreeMaxDepth = sizeof(ulong) * 8;

const int MinimumSegmentSize = 4_096;

private ReadOnlySpan<byte> _buffer;

private int _consumed;
Expand Down Expand Up @@ -338,65 +341,139 @@ public bool Read()

private bool ReadNextSegment()
{
if (_pooledArray == null)
return false;

bool result = false;

Debug.Assert(!_isLastSegment);

do
{
ReadOnlyMemory<byte> memory = default;
while (true)
if (_consumed == 0)
{
bool noMoreData = !_data.TryGet(ref _nextPosition, out memory, advance: true);
if (noMoreData)
if (!_data.TryGet(ref _nextPosition, out _, advance: false))
return false;
if (memory.Length != 0)
break;
CopyNextSequences();
}
else
{
ReadOnlyMemory<byte> memory = default;
while (true)
{
bool noMoreData = !_data.TryGet(ref _nextPosition, out memory, advance: true);
if (noMoreData)
return false;
if (memory.Length != 0)
break;
}

if (_isFinalBlock)
_isLastSegment = !_data.TryGet(ref _nextPosition, out _, advance: false);
if (_isFinalBlock)
_isLastSegment = !_data.TryGet(ref _nextPosition, out _, advance: false);

ReadOnlySpan<byte> leftOver = default;
if (_consumed < _buffer.Length)
{
leftOver = _buffer.Slice(_consumed);
CopyLeftOverAndNext(memory);
}

// TODO: Should this be a settable property?
if (leftOver.Length >= 1_000_000)
result = ReadSingleSegment();
} while (!result && !_isLastSegment);
return result;
}

private void CopyLeftOverAndNext(ReadOnlyMemory<byte> memory)
{
if (_consumed < _buffer.Length)
{
ReadOnlySpan<byte> leftOver = _buffer.Slice(_consumed);

if (leftOver.Length > _buffer.Length - memory.Length)
{
// A single JSON token exceeds 1 MB in size . In such a rare case, allocate.
// TODO: Slice based on current SequencePosition
_buffer = _data.Slice(Consumed).ToArray();
if (_isFinalBlock)
_isLastSegment = true;
if (leftOver.Length > int.MaxValue - memory.Length)
ThrowArgumentException("Current sequence segment size is too large to fit left over data from the previous segment into a 2 GB buffer.");

// This is guaranteed to not overflow due to the check above.
ResizeBuffer(leftOver.Length + memory.Length);
}
else
{
if (leftOver.Length > _buffer.Length - memory.Length)
{
if (leftOver.Length > int.MaxValue - memory.Length)
ThrowArgumentException("Current sequence segment size is too large to fit left over data from the previous segment into a 2 GB buffer.");

ResizeBuffer(leftOver.Length + memory.Length);
}
// This is guaranteed to not overflow
// -> _buffer.Length <= int.MaxValue, since it needs to fit in a span.
// -> leftOver.Length < _buffer.Length, since we leftOver is a slice of _buffer and _consumed != 0
// -> if leftOver.Length + memory.Length > _buffer.Length
// ->-> we check if leftOver.Length + memory.Length > int.MaxValue and throw above
// -> therefore, leftOver.Length + memory.Length < _buffer.Length <= int.MaxValue
Span<byte> bufferSpan = _pooledArray.AsSpan(0, leftOver.Length + memory.Length);
leftOver.CopyTo(bufferSpan);
memory.Span.CopyTo(bufferSpan.Slice(leftOver.Length));
_buffer = bufferSpan;
bufferSpan = default;
}
else
{
if (_buffer.Length < memory.Length)
ResizeBuffer(memory.Length);

Span<byte> bufferSpan = _pooledArray;
leftOver.CopyTo(bufferSpan);
Span<byte> bufferSpan = _pooledArray.AsSpan(0, memory.Length);
memory.Span.CopyTo(bufferSpan);
_buffer = bufferSpan;
bufferSpan = default;
}

memory.Span.CopyTo(bufferSpan.Slice(leftOver.Length));
bufferSpan = bufferSpan.Slice(0, leftOver.Length + memory.Length); // This is gauranteed to not overflow
_totalConsumed += _consumed;
_consumed = 0;
}

_totalConsumed += _consumed;
_consumed = 0;
private void CopyNextSequences()
{
Debug.Assert(_consumed == 0);

_buffer = bufferSpan;
}
// TODO: Should we try to support more than 1 GB?
if (_buffer.Length > 1_000_000_000)
ThrowArgumentException("Current sequence segment size is too large to fit left over data from the previous segment into a 2 GB buffer.");

result = ReadSingleSegment();
} while (!result && !_isLastSegment);
return result;
// TODO: Always double or is it useful to have MinimumSegmentSize?
// This is guaranteed to not overflow since _buffer.Length <= 1 billion
int minSize = Math.Max(_buffer.Length * 2, MinimumSegmentSize);
if (_pooledArray.Length <= minSize)
ResizeBuffer(minSize);

Span<byte> bufferSpan = _pooledArray;
_buffer.CopyTo(bufferSpan);
bufferSpan = bufferSpan.Slice(_buffer.Length);

int copied = 0;
while (true)
{
SequencePosition prevNextPosition = _nextPosition;
if (_data.TryGet(ref _nextPosition, out ReadOnlyMemory<byte> memory, advance: true))
{
if (memory.Length == 0)
continue;

ReadOnlySpan<byte> currentSpan = memory.Span;
if (!currentSpan.TryCopyTo(bufferSpan.Slice(copied)))
{
_nextPosition = prevNextPosition;
break;
}

// This is guaranteed to not overflow:
// -> _pooledArray.Length <= int.MaxValue, therefore bufferSpan.Length <= int.MaxValue
// -> bufferSpan.Length - copied >= currentSpan.Length, since currentSpan.TryCopyTo succeeded
// -> int.MaxValue - copied >= currentSpan.Length, assuming maximum bufferSpan.Length possible
// -> int.MaxValue >= currentSpan.Length + copied
copied += currentSpan.Length;
prevNextPosition = _nextPosition;
}
else
{
_nextPosition = prevNextPosition;
if (_isFinalBlock)
_isLastSegment = true;
break;
}
}
bufferSpan = default;
Debug.Assert(_buffer.Length <= int.MaxValue - copied);
_buffer = _pooledArray.AsSpan(0, _buffer.Length + copied); // This is guaranteed to not overflow
}

public void Skip()
Expand Down
62 changes: 27 additions & 35 deletions src/System.Text.JsonLab/System/Text/Json/Utf8JsonReaderStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,17 @@
// See the LICENSE file in the project root for more information.

using System.Buffers;
using System.Diagnostics;
using System.IO;

namespace System.Text.JsonLab
{
public ref struct Utf8JsonReaderStream
{
private Utf8JsonReader _jsonReader;
private Span<byte> _span;
private Span<byte> _buffer;
private Stream _stream;
private byte[] _buffer;
private byte[] _pooledArray;
private bool _isFinalBlock;
private long _consumed;

Expand All @@ -28,13 +29,13 @@ public Utf8JsonReaderStream(Stream jsonStream)
if (!jsonStream.CanRead || !jsonStream.CanSeek)
JsonThrowHelper.ThrowArgumentException("Stream must be readable and seekable.");

_buffer = ArrayPool<byte>.Shared.Rent(FirstSegmentSize);
int numberOfBytes = jsonStream.Read(_buffer, 0, FirstSegmentSize);
_span = _buffer.AsSpan(0, numberOfBytes);
_pooledArray = ArrayPool<byte>.Shared.Rent(FirstSegmentSize);
int numberOfBytes = jsonStream.Read(_pooledArray, 0, FirstSegmentSize);
_buffer = _pooledArray.AsSpan(0, numberOfBytes);
_stream = jsonStream;

_isFinalBlock = numberOfBytes == 0;
_jsonReader = new Utf8JsonReader(_span, _isFinalBlock);
_jsonReader = new Utf8JsonReader(_buffer, _isFinalBlock);
_consumed = 0;
}

Expand All @@ -55,46 +56,32 @@ private bool ReadNext()
do
{
_consumed += _jsonReader.Consumed;
int leftOver = _span.Length - (int)_jsonReader.Consumed;
int leftOver = _buffer.Length - (int)_jsonReader.Consumed;
int amountToRead = StreamSegmentSize;
if (leftOver > 0)
{
_stream.Position -= leftOver;

// TODO: Should this be a settable property?
if (leftOver >= 1_000_000)
if (_jsonReader.Consumed == 0)
{
// A single JSON token exceeds 1 MB in size. Start doubling.
if (leftOver > 1_000_000_000)
amountToRead = 2_000_000_000;
else
amountToRead = leftOver * 2;

if (leftOver >= 2_000_000_000)
JsonThrowHelper.ThrowArgumentException("Cannot fit left over data from the previous chunk and the next chunk of data into a 2 GB buffer.");
}
else
{
if (_jsonReader.Consumed == 0)
{
if (leftOver > int.MaxValue - amountToRead)
JsonThrowHelper.ThrowArgumentException("Cannot fit left over data from the previous chunk and the next chunk of data into a 2 GB buffer.");

amountToRead += leftOver; // This is gauranteed to not overflow
ResizeBuffer(amountToRead);
}

// This is guaranteed to not overflow due to the check above.
amountToRead += leftOver * 2;
ResizeBuffer(amountToRead);
}
}

if (_buffer.Length < amountToRead)
ResizeBuffer(amountToRead);
if (_pooledArray.Length < StreamSegmentSize)
ResizeBuffer(StreamSegmentSize);

int numberOfBytes = _stream.Read(_buffer, 0, amountToRead);
int numberOfBytes = _stream.Read(_pooledArray, 0, amountToRead);
_isFinalBlock = numberOfBytes == 0; // TODO: Can this be inferred differently based on leftOver and numberOfBytes

_span = _buffer.AsSpan(0, numberOfBytes);
_buffer = _pooledArray.AsSpan(0, numberOfBytes);

_jsonReader = new Utf8JsonReader(_span, _isFinalBlock, _jsonReader.State);
_jsonReader = new Utf8JsonReader(_buffer, _isFinalBlock, _jsonReader.State);
result = _jsonReader.Read();
} while (!result && !_isFinalBlock);

Expand All @@ -103,14 +90,19 @@ private bool ReadNext()

private void ResizeBuffer(int minimumSize)
{
ArrayPool<byte>.Shared.Return(_buffer);
_buffer = ArrayPool<byte>.Shared.Rent(minimumSize);
Debug.Assert(minimumSize > 0);
Debug.Assert(_pooledArray != null);
ArrayPool<byte>.Shared.Return(_pooledArray);
_pooledArray = ArrayPool<byte>.Shared.Rent(minimumSize);
}

public void Dispose()
{
ArrayPool<byte>.Shared.Return(_buffer);
_buffer = null;
if (_pooledArray != null)
{
ArrayPool<byte>.Shared.Return(_pooledArray);
_pooledArray = null;
}
}
}
}
Loading

0 comments on commit fe117a6

Please sign in to comment.