-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Add ArcBuffer support to Orleans serialization #10066
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -194,6 +194,15 @@ public static class Reader | |
| [MethodImpl(MethodImplOptions.AggressiveInlining)] | ||
| public static Reader<BufferSliceReaderInput> Create(BufferSlice input, SerializerSession session) => new(new BufferSliceReaderInput(in input), session, 0); | ||
|
|
||
| /// <summary> | ||
| /// Creates a reader for the provided buffer. | ||
| /// </summary> | ||
| /// <param name="input">The input.</param> | ||
| /// <param name="session">The session.</param> | ||
| /// <returns>A new <see cref="Reader{TInput}"/>.</returns> | ||
| [MethodImpl(MethodImplOptions.AggressiveInlining)] | ||
| public static Reader<ArcBufferReaderInput> Create(ArcBuffer input, SerializerSession session) => new(new ArcBufferReaderInput(in input), session, 0); | ||
|
Comment on lines
+197
to
+204
|
||
|
|
||
| /// <summary> | ||
| /// Creates a reader for the provided input stream. | ||
| /// </summary> | ||
|
|
@@ -267,6 +276,7 @@ public ref struct Reader<TInput> | |
| private readonly static bool IsReadOnlySequenceInput = typeof(TInput) == typeof(ReadOnlySequenceInput); | ||
| private readonly static bool IsReaderInput = typeof(ReaderInput).IsAssignableFrom(typeof(TInput)); | ||
| private readonly static bool IsBufferSliceInput = typeof(TInput) == typeof(BufferSliceReaderInput); | ||
| private readonly static bool IsArcBufferInput = typeof(TInput) == typeof(ArcBufferReaderInput); | ||
|
|
||
| private ReadOnlySpan<byte> _currentSpan; | ||
| private int _bufferPos; | ||
|
|
@@ -297,6 +307,15 @@ internal Reader(TInput input, SerializerSession session, long globalOffset) | |
| _bufferSize = _currentSpan.Length; | ||
| _sequenceOffset = globalOffset; | ||
| } | ||
| else if (IsArcBufferInput) | ||
| { | ||
| _input = input; | ||
| ref var slice = ref Unsafe.As<TInput, ArcBufferReaderInput>(ref _input); | ||
| _currentSpan = slice.GetNext(); | ||
| _bufferPos = 0; | ||
| _bufferSize = _currentSpan.Length; | ||
| _sequenceOffset = globalOffset; | ||
| } | ||
| else if (IsReaderInput) | ||
| { | ||
| _input = input; | ||
|
|
@@ -357,6 +376,11 @@ public long Position | |
| var previousBuffersSize = Unsafe.As<TInput, BufferSliceReaderInput>(ref _input).PreviousBuffersSize; | ||
| return _sequenceOffset + previousBuffersSize + _bufferPos; | ||
| } | ||
| else if (IsArcBufferInput) | ||
| { | ||
| var previousBuffersSize = Unsafe.As<TInput, ArcBufferReaderInput>(ref _input).PreviousBuffersSize; | ||
| return _sequenceOffset + previousBuffersSize + _bufferPos; | ||
| } | ||
| else if (IsSpanInput) | ||
| { | ||
| return _sequenceOffset + _bufferPos; | ||
|
|
@@ -388,6 +412,10 @@ public long Length | |
| { | ||
| return Unsafe.As<TInput, BufferSliceReaderInput>(ref _input).Length; | ||
| } | ||
| else if (IsArcBufferInput) | ||
| { | ||
| return Unsafe.As<TInput, ArcBufferReaderInput>(ref _input).Length; | ||
| } | ||
| else if (IsSpanInput) | ||
| { | ||
| return _currentSpan.Length; | ||
|
|
@@ -441,6 +469,22 @@ public void Skip(long count) | |
| } | ||
| } | ||
| } | ||
| else if (IsArcBufferInput) | ||
| { | ||
| var end = Position + count; | ||
| while (Position < end) | ||
| { | ||
| var previousBuffersSize = Unsafe.As<TInput, ArcBufferReaderInput>(ref _input).PreviousBuffersSize; | ||
| if (end - previousBuffersSize <= _bufferSize) | ||
| { | ||
| _bufferPos = (int)(end - previousBuffersSize); | ||
| } | ||
| else | ||
| { | ||
| MoveNext(); | ||
| } | ||
| } | ||
| } | ||
| else if (IsSpanInput) | ||
| { | ||
| _bufferPos += (int)count; | ||
|
|
@@ -496,6 +540,17 @@ public void ForkFrom(long position, out Reader<TInput> forked) | |
| ThrowInvalidPosition(position, forked.Position); | ||
| } | ||
| } | ||
| else if (IsArcBufferInput) | ||
| { | ||
| ref var input = ref Unsafe.As<TInput, ArcBufferReaderInput>(ref _input); | ||
| var newInput = input.ForkFrom(checked((int)position)); | ||
| forked = new Reader<TInput>(Unsafe.As<ArcBufferReaderInput, TInput>(ref newInput), Session, position); | ||
|
|
||
| if (forked.Position != position) | ||
| { | ||
| ThrowInvalidPosition(position, forked.Position); | ||
| } | ||
| } | ||
| else if (IsSpanInput) | ||
| { | ||
| forked = new Reader<TInput>(_currentSpan[(int)position..], Session, position); | ||
|
|
@@ -541,6 +596,10 @@ public void ResumeFrom(long position) | |
| { | ||
| // Nothing is required. | ||
| } | ||
| else if (IsArcBufferInput) | ||
| { | ||
| // Nothing is required. | ||
| } | ||
| else if (IsSpanInput) | ||
| { | ||
| // Nothing is required. | ||
|
|
@@ -598,6 +657,14 @@ private void MoveNext() | |
| _bufferPos = 0; | ||
| _bufferSize = _currentSpan.Length; | ||
| } | ||
| else if (IsArcBufferInput) | ||
| { | ||
| ref var slice = ref Unsafe.As<TInput, ArcBufferReaderInput>(ref _input); | ||
| slice.PreviousBuffersSize += _bufferSize; | ||
| _currentSpan = slice.GetNext(); | ||
| _bufferPos = 0; | ||
| _bufferSize = _currentSpan.Length; | ||
| } | ||
| else if (IsSpanInput) | ||
| { | ||
| ThrowInsufficientData(); | ||
|
|
@@ -615,7 +682,7 @@ private void MoveNext() | |
| [MethodImpl(MethodImplOptions.AggressiveInlining)] | ||
| public byte ReadByte() | ||
| { | ||
| if (IsReadOnlySequenceInput || IsSpanInput || IsBufferSliceInput) | ||
| if (IsReadOnlySequenceInput || IsSpanInput || IsBufferSliceInput || IsArcBufferInput) | ||
| { | ||
| var pos = _bufferPos; | ||
| if ((uint)pos < (uint)_currentSpan.Length) | ||
|
|
@@ -657,7 +724,7 @@ private static byte ReadByteSlow(ref Reader<TInput> reader) | |
| /// <returns>The <see cref="uint"/> which was read.</returns> | ||
| public uint ReadUInt32() | ||
| { | ||
| if (IsReadOnlySequenceInput || IsSpanInput || IsBufferSliceInput) | ||
| if (IsReadOnlySequenceInput || IsSpanInput || IsBufferSliceInput || IsArcBufferInput) | ||
| { | ||
| const int width = 4; | ||
| if (_bufferPos + width > _bufferSize) | ||
|
|
@@ -701,7 +768,7 @@ static uint ReadSlower(ref Reader<TInput> r) | |
| /// <returns>The <see cref="ulong"/> which was read.</returns> | ||
| public ulong ReadUInt64() | ||
| { | ||
| if (IsReadOnlySequenceInput || IsSpanInput || IsBufferSliceInput) | ||
| if (IsReadOnlySequenceInput || IsSpanInput || IsBufferSliceInput || IsArcBufferInput) | ||
| { | ||
| const int width = 8; | ||
| if (_bufferPos + width > _bufferSize) | ||
|
|
@@ -779,7 +846,7 @@ public byte[] ReadBytes(uint count) | |
| } | ||
|
|
||
| var bytes = new byte[count]; | ||
| if (IsReadOnlySequenceInput || IsSpanInput || IsBufferSliceInput) | ||
| if (IsReadOnlySequenceInput || IsSpanInput || IsBufferSliceInput || IsArcBufferInput) | ||
| { | ||
| var destination = new Span<byte>(bytes); | ||
| ReadBytes(destination); | ||
|
|
@@ -798,7 +865,7 @@ public byte[] ReadBytes(uint count) | |
| /// <param name="destination">The destination.</param> | ||
| public void ReadBytes(scoped Span<byte> destination) | ||
| { | ||
| if (IsReadOnlySequenceInput || IsSpanInput || IsBufferSliceInput) | ||
| if (IsReadOnlySequenceInput || IsSpanInput || IsBufferSliceInput || IsArcBufferInput) | ||
| { | ||
| if (_bufferPos + destination.Length <= _bufferSize) | ||
| { | ||
|
|
@@ -846,7 +913,7 @@ private void ReadBytesMultiSegment(scoped Span<byte> dest) | |
| [MethodImpl(MethodImplOptions.AggressiveInlining)] | ||
| public bool TryReadBytes(int length, out ReadOnlySpan<byte> bytes) | ||
| { | ||
| if (IsReadOnlySequenceInput || IsSpanInput || IsBufferSliceInput) | ||
| if (IsReadOnlySequenceInput || IsSpanInput || IsBufferSliceInput || IsArcBufferInput) | ||
| { | ||
| if (_bufferPos + length <= _bufferSize) | ||
| { | ||
|
|
@@ -879,7 +946,7 @@ public bool TryReadBytes(int length, out ReadOnlySpan<byte> bytes) | |
| [MethodImpl(MethodImplOptions.AggressiveInlining)] | ||
| public unsafe uint ReadVarUInt32() | ||
| { | ||
| if (IsReadOnlySequenceInput || IsSpanInput || IsBufferSliceInput) | ||
| if (IsReadOnlySequenceInput || IsSpanInput || IsBufferSliceInput || IsArcBufferInput) | ||
| { | ||
| var pos = _bufferPos; | ||
|
|
||
|
|
@@ -937,7 +1004,7 @@ private uint ReadVarUInt32Slow() | |
| [MethodImpl(MethodImplOptions.AggressiveInlining)] | ||
| public ulong ReadVarUInt64() | ||
| { | ||
| if (IsReadOnlySequenceInput || IsSpanInput || IsBufferSliceInput) | ||
| if (IsReadOnlySequenceInput || IsSpanInput || IsBufferSliceInput || IsArcBufferInput) | ||
| { | ||
| var pos = _bufferPos; | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -30,6 +30,15 @@ public static class Writer | |
| [MethodImpl(MethodImplOptions.AggressiveInlining)] | ||
| public static Writer<TBufferWriter> Create<TBufferWriter>(TBufferWriter destination, SerializerSession session) where TBufferWriter : IBufferWriter<byte> => new(destination, session); | ||
|
|
||
| /// <summary> | ||
| /// Creates a writer which writes to the specified destination. | ||
| /// </summary> | ||
| /// <param name="destination">The destination.</param> | ||
| /// <param name="session">The session.</param> | ||
| /// <returns>A new <see cref="Writer{TBufferWriter}"/>.</returns> | ||
| [MethodImpl(MethodImplOptions.AggressiveInlining)] | ||
| public static Writer<ArcBufferWriterWrapper> Create(ArcBufferWriter destination, SerializerSession session) => new(new(destination), session); | ||
|
|
||
|
Comment on lines
+33
to
+41
|
||
| /// <summary> | ||
| /// Creates a writer which writes to the specified destination. | ||
| /// </summary> | ||
|
|
@@ -95,6 +104,25 @@ public static class Writer | |
| public static Writer<PooledBuffer> CreatePooled(SerializerSession session) => new(new PooledBuffer(), session); | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Wraps an <see cref="ArcBufferWriter"/> for use as a serialization writer target. | ||
| /// </summary> | ||
| /// <remarks> | ||
| /// Disposing a <see cref="Writer{TBufferWriter}"/> over this wrapper does not dispose the underlying <see cref="ArcBufferWriter"/>. | ||
| /// </remarks> | ||
| /// <param name="bufferWriter">The wrapped buffer writer.</param> | ||
| public readonly struct ArcBufferWriterWrapper(ArcBufferWriter bufferWriter) : IBufferWriter<byte> | ||
| { | ||
| /// <inheritdoc/> | ||
| public void Advance(int count) => ((IBufferWriter<byte>)bufferWriter).Advance(count); | ||
|
|
||
| /// <inheritdoc/> | ||
| public Memory<byte> GetMemory(int sizeHint = 0) => bufferWriter.GetMemory(sizeHint); | ||
|
|
||
| /// <inheritdoc/> | ||
| public Span<byte> GetSpan(int sizeHint = 0) => bufferWriter.GetSpan(sizeHint); | ||
| } | ||
|
Comment on lines
+111
to
+124
|
||
|
|
||
| /// <summary> | ||
| /// Provides functionality for writing to an output stream. | ||
| /// </summary> | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ArcBufferReaderInputis a new public type, but it is not present in the committed API baseline (src/api/Orleans.Serialization/Orleans.Serialization.cs). Update the generated API surface file so this type is captured for API review.This issue also appears on line 121 of the same file.