diff --git a/src/Orleans.Serialization.TestKit/FieldCodecTester.cs b/src/Orleans.Serialization.TestKit/FieldCodecTester.cs index bdb207d1bb6..00981d6eeca 100644 --- a/src/Orleans.Serialization.TestKit/FieldCodecTester.cs +++ b/src/Orleans.Serialization.TestKit/FieldCodecTester.cs @@ -971,27 +971,28 @@ protected T RoundTripThroughCodec(T original) protected object RoundTripThroughUntypedSerializer(object original, out string formattedBitStream) { object result; - using (var readerSession = SessionPool.GetSession()) - using (var writeSession = SessionPool.GetSession()) + using var readerSession = SessionPool.GetSession(); + using var writeSession = SessionPool.GetSession(); + + using var bufferWriter = new ArcBufferWriter(); + var writer = Writer.Create(bufferWriter, writeSession); + try { - var writer = Writer.CreatePooled(writeSession); - try - { - var serializer = ServiceProvider.GetService>(); - serializer.Serialize(original, ref writer); + var serializer = ServiceProvider.GetService>(); + serializer.Serialize(original, ref writer); - using var analyzerSession = SessionPool.GetSession(); - var output = writer.Output.Slice(); - formattedBitStream = BitStreamFormatter.Format(output, analyzerSession); + using var analyzerSession = SessionPool.GetSession(); + using var output = bufferWriter.ConsumeSlice(bufferWriter.Length); + var analyzerReader = Reader.Create(output, analyzerSession); + formattedBitStream = BitStreamFormatter.Format(ref analyzerReader); - var reader = Reader.Create(output, readerSession); + var reader = Reader.Create(output, readerSession); - result = serializer.Deserialize(ref reader); - } - finally - { - writer.Dispose(); - } + result = serializer.Deserialize(ref reader); + } + finally + { + writer.Dispose(); } return result; diff --git a/src/Orleans.Serialization/Buffers/Adaptors/BufferSliceReaderInput.cs b/src/Orleans.Serialization/Buffers/Adaptors/BufferSliceReaderInput.cs index 4b607697100..594599c18ab 100644 --- a/src/Orleans.Serialization/Buffers/Adaptors/BufferSliceReaderInput.cs +++ b/src/Orleans.Serialization/Buffers/Adaptors/BufferSliceReaderInput.cs @@ -110,3 +110,63 @@ internal ReadOnlySpan GetNext() [DoesNotReturn] private static void ThrowInsufficientData() => throw new InvalidOperationException("Insufficient data present in buffer."); } + +/// +/// Input type for to support buffers. +/// +/// +/// Initializes a new instance of the type. +/// +/// The underlying buffer. +public struct ArcBufferReaderInput(in ArcBuffer slice) +{ + private readonly ArcBuffer _slice = slice; + private ArcBufferPage _page = slice.First; + private int _position; + + internal readonly ArcBufferPage First => _slice.First; + internal readonly int Position => _position; + internal readonly int Offset => _slice.Offset; + internal readonly int Length => _slice.Length; + internal long PreviousBuffersSize; + + internal readonly ArcBufferReaderInput ForkFrom(int position) + { + // Rely on the outer buffer being pinned. + var sliced = _slice.UnsafeSlice(position, Length - position); + return new ArcBufferReaderInput(in sliced); + } + + internal ReadOnlySpan GetNext() + { + Debug.Assert(_position <= Length); + if (_page is not null) + { + if (_page == First) + { + Debug.Assert(_position == 0); + var offset = Offset; + var length = Math.Min(Length, _page.Length - offset); + _position += length; + var result = _page.AsSpan(offset, length); + _page = _page.Next; + return result; + } + + if (_position != Length) + { + var length = Math.Min(Length - _position, _page.Length); + _position += length; + var result = _page.AsSpan(0, length); + _page = _page.Next; + return result; + } + } + + ThrowInsufficientData(); + return default; + } + + [DoesNotReturn] + private static void ThrowInsufficientData() => throw new InvalidOperationException("Insufficient data present in buffer."); +} diff --git a/src/Orleans.Serialization/Buffers/Reader.cs b/src/Orleans.Serialization/Buffers/Reader.cs index 0ee6047d1c5..43b23164ab8 100644 --- a/src/Orleans.Serialization/Buffers/Reader.cs +++ b/src/Orleans.Serialization/Buffers/Reader.cs @@ -194,6 +194,15 @@ public static class Reader [MethodImpl(MethodImplOptions.AggressiveInlining)] public static Reader Create(BufferSlice input, SerializerSession session) => new(new BufferSliceReaderInput(in input), session, 0); + /// + /// Creates a reader for the provided buffer. + /// + /// The input. + /// The session. + /// A new . + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public static Reader Create(ArcBuffer input, SerializerSession session) => new(new ArcBufferReaderInput(in input), session, 0); + /// /// Creates a reader for the provided input stream. /// @@ -267,6 +276,7 @@ public ref struct Reader private readonly static bool IsReadOnlySequenceInput = typeof(TInput) == typeof(ReadOnlySequenceInput); private readonly static bool IsReaderInput = typeof(ReaderInput).IsAssignableFrom(typeof(TInput)); private readonly static bool IsBufferSliceInput = typeof(TInput) == typeof(BufferSliceReaderInput); + private readonly static bool IsArcBufferInput = typeof(TInput) == typeof(ArcBufferReaderInput); private ReadOnlySpan _currentSpan; private int _bufferPos; @@ -297,6 +307,15 @@ internal Reader(TInput input, SerializerSession session, long globalOffset) _bufferSize = _currentSpan.Length; _sequenceOffset = globalOffset; } + else if (IsArcBufferInput) + { + _input = input; + ref var slice = ref Unsafe.As(ref _input); + _currentSpan = slice.GetNext(); + _bufferPos = 0; + _bufferSize = _currentSpan.Length; + _sequenceOffset = globalOffset; + } else if (IsReaderInput) { _input = input; @@ -357,6 +376,11 @@ public long Position var previousBuffersSize = Unsafe.As(ref _input).PreviousBuffersSize; return _sequenceOffset + previousBuffersSize + _bufferPos; } + else if (IsArcBufferInput) + { + var previousBuffersSize = Unsafe.As(ref _input).PreviousBuffersSize; + return _sequenceOffset + previousBuffersSize + _bufferPos; + } else if (IsSpanInput) { return _sequenceOffset + _bufferPos; @@ -388,6 +412,10 @@ public long Length { return Unsafe.As(ref _input).Length; } + else if (IsArcBufferInput) + { + return Unsafe.As(ref _input).Length; + } else if (IsSpanInput) { return _currentSpan.Length; @@ -441,6 +469,22 @@ public void Skip(long count) } } } + else if (IsArcBufferInput) + { + var end = Position + count; + while (Position < end) + { + var previousBuffersSize = Unsafe.As(ref _input).PreviousBuffersSize; + if (end - previousBuffersSize <= _bufferSize) + { + _bufferPos = (int)(end - previousBuffersSize); + } + else + { + MoveNext(); + } + } + } else if (IsSpanInput) { _bufferPos += (int)count; @@ -496,6 +540,17 @@ public void ForkFrom(long position, out Reader forked) ThrowInvalidPosition(position, forked.Position); } } + else if (IsArcBufferInput) + { + ref var input = ref Unsafe.As(ref _input); + var newInput = input.ForkFrom(checked((int)position)); + forked = new Reader(Unsafe.As(ref newInput), Session, position); + + if (forked.Position != position) + { + ThrowInvalidPosition(position, forked.Position); + } + } else if (IsSpanInput) { forked = new Reader(_currentSpan[(int)position..], Session, position); @@ -541,6 +596,10 @@ public void ResumeFrom(long position) { // Nothing is required. } + else if (IsArcBufferInput) + { + // Nothing is required. + } else if (IsSpanInput) { // Nothing is required. @@ -598,6 +657,14 @@ private void MoveNext() _bufferPos = 0; _bufferSize = _currentSpan.Length; } + else if (IsArcBufferInput) + { + ref var slice = ref Unsafe.As(ref _input); + slice.PreviousBuffersSize += _bufferSize; + _currentSpan = slice.GetNext(); + _bufferPos = 0; + _bufferSize = _currentSpan.Length; + } else if (IsSpanInput) { ThrowInsufficientData(); @@ -615,7 +682,7 @@ private void MoveNext() [MethodImpl(MethodImplOptions.AggressiveInlining)] public byte ReadByte() { - if (IsReadOnlySequenceInput || IsSpanInput || IsBufferSliceInput) + if (IsReadOnlySequenceInput || IsSpanInput || IsBufferSliceInput || IsArcBufferInput) { var pos = _bufferPos; if ((uint)pos < (uint)_currentSpan.Length) @@ -657,7 +724,7 @@ private static byte ReadByteSlow(ref Reader reader) /// The which was read. public uint ReadUInt32() { - if (IsReadOnlySequenceInput || IsSpanInput || IsBufferSliceInput) + if (IsReadOnlySequenceInput || IsSpanInput || IsBufferSliceInput || IsArcBufferInput) { const int width = 4; if (_bufferPos + width > _bufferSize) @@ -701,7 +768,7 @@ static uint ReadSlower(ref Reader r) /// The which was read. public ulong ReadUInt64() { - if (IsReadOnlySequenceInput || IsSpanInput || IsBufferSliceInput) + if (IsReadOnlySequenceInput || IsSpanInput || IsBufferSliceInput || IsArcBufferInput) { const int width = 8; if (_bufferPos + width > _bufferSize) @@ -779,7 +846,7 @@ public byte[] ReadBytes(uint count) } var bytes = new byte[count]; - if (IsReadOnlySequenceInput || IsSpanInput || IsBufferSliceInput) + if (IsReadOnlySequenceInput || IsSpanInput || IsBufferSliceInput || IsArcBufferInput) { var destination = new Span(bytes); ReadBytes(destination); @@ -798,7 +865,7 @@ public byte[] ReadBytes(uint count) /// The destination. public void ReadBytes(scoped Span destination) { - if (IsReadOnlySequenceInput || IsSpanInput || IsBufferSliceInput) + if (IsReadOnlySequenceInput || IsSpanInput || IsBufferSliceInput || IsArcBufferInput) { if (_bufferPos + destination.Length <= _bufferSize) { @@ -846,7 +913,7 @@ private void ReadBytesMultiSegment(scoped Span dest) [MethodImpl(MethodImplOptions.AggressiveInlining)] public bool TryReadBytes(int length, out ReadOnlySpan bytes) { - if (IsReadOnlySequenceInput || IsSpanInput || IsBufferSliceInput) + if (IsReadOnlySequenceInput || IsSpanInput || IsBufferSliceInput || IsArcBufferInput) { if (_bufferPos + length <= _bufferSize) { @@ -879,7 +946,7 @@ public bool TryReadBytes(int length, out ReadOnlySpan bytes) [MethodImpl(MethodImplOptions.AggressiveInlining)] public unsafe uint ReadVarUInt32() { - if (IsReadOnlySequenceInput || IsSpanInput || IsBufferSliceInput) + if (IsReadOnlySequenceInput || IsSpanInput || IsBufferSliceInput || IsArcBufferInput) { var pos = _bufferPos; @@ -937,7 +1004,7 @@ private uint ReadVarUInt32Slow() [MethodImpl(MethodImplOptions.AggressiveInlining)] public ulong ReadVarUInt64() { - if (IsReadOnlySequenceInput || IsSpanInput || IsBufferSliceInput) + if (IsReadOnlySequenceInput || IsSpanInput || IsBufferSliceInput || IsArcBufferInput) { var pos = _bufferPos; diff --git a/src/Orleans.Serialization/Buffers/Writer.cs b/src/Orleans.Serialization/Buffers/Writer.cs index 9a17a9259b3..37d54c9afa5 100644 --- a/src/Orleans.Serialization/Buffers/Writer.cs +++ b/src/Orleans.Serialization/Buffers/Writer.cs @@ -30,6 +30,15 @@ public static class Writer [MethodImpl(MethodImplOptions.AggressiveInlining)] public static Writer Create(TBufferWriter destination, SerializerSession session) where TBufferWriter : IBufferWriter => new(destination, session); + /// + /// Creates a writer which writes to the specified destination. + /// + /// The destination. + /// The session. + /// A new . + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public static Writer Create(ArcBufferWriter destination, SerializerSession session) => new(new(destination), session); + /// /// Creates a writer which writes to the specified destination. /// @@ -95,6 +104,25 @@ public static class Writer public static Writer CreatePooled(SerializerSession session) => new(new PooledBuffer(), session); } + /// + /// Wraps an for use as a serialization writer target. + /// + /// + /// Disposing a over this wrapper does not dispose the underlying . + /// + /// The wrapped buffer writer. + public readonly struct ArcBufferWriterWrapper(ArcBufferWriter bufferWriter) : IBufferWriter + { + /// + public void Advance(int count) => ((IBufferWriter)bufferWriter).Advance(count); + + /// + public Memory GetMemory(int sizeHint = 0) => bufferWriter.GetMemory(sizeHint); + + /// + public Span GetSpan(int sizeHint = 0) => bufferWriter.GetSpan(sizeHint); + } + /// /// Provides functionality for writing to an output stream. /// diff --git a/src/Orleans.Serialization/Serializer.cs b/src/Orleans.Serialization/Serializer.cs index 875a12a326b..9630ada35f1 100644 --- a/src/Orleans.Serialization/Serializer.cs +++ b/src/Orleans.Serialization/Serializer.cs @@ -408,6 +408,36 @@ public T Deserialize(PooledBuffer.BufferSlice source, SerializerSession sessi return codec.ReadValue(ref reader, field); } + /// + /// Deserialize a value of type from . + /// + /// The serialized type. + /// The source buffer. + /// The deserialized value. + public T Deserialize(ArcBuffer source) + { + using var session = _sessionPool.GetSession(); + var reader = Reader.Create(source, session); + var codec = session.CodecProvider.GetCodec(); + var field = reader.ReadFieldHeader(); + return codec.ReadValue(ref reader, field); + } + + /// + /// Deserialize a value of type from . + /// + /// The serialized type. + /// The source buffer. + /// The serializer session. + /// The deserialized value. + public T Deserialize(ArcBuffer source, SerializerSession session) + { + var reader = Reader.Create(source, session); + var codec = session.CodecProvider.GetCodec(); + var field = reader.ReadFieldHeader(); + return codec.ReadValue(ref reader, field); + } + /// /// Deserialize a value of type from . /// @@ -845,6 +875,32 @@ public T Deserialize(PooledBuffer.BufferSlice source, SerializerSession session) return _codec.ReadValue(ref reader, field); } + /// + /// Deserialize a value of type from . + /// + /// The source buffer. + /// The deserialized value. + public T Deserialize(ArcBuffer source) + { + using var session = _sessionPool.GetSession(); + var reader = Reader.Create(source, session); + var field = reader.ReadFieldHeader(); + return _codec.ReadValue(ref reader, field); + } + + /// + /// Deserialize a value of type from . + /// + /// The source buffer. + /// The serializer session. + /// The deserialized value. + public T Deserialize(ArcBuffer source, SerializerSession session) + { + var reader = Reader.Create(source, session); + var field = reader.ReadFieldHeader(); + return _codec.ReadValue(ref reader, field); + } + /// /// Deserialize a value of type from . /// diff --git a/test/Orleans.Serialization.UnitTests/Buffers/ArcBufferWriterSerializationTests.cs b/test/Orleans.Serialization.UnitTests/Buffers/ArcBufferWriterSerializationTests.cs new file mode 100644 index 00000000000..0322e4247a4 --- /dev/null +++ b/test/Orleans.Serialization.UnitTests/Buffers/ArcBufferWriterSerializationTests.cs @@ -0,0 +1,66 @@ +using Microsoft.Extensions.DependencyInjection; +using Orleans.Serialization.Buffers; +using Orleans.Serialization.Session; +using Xunit; + +namespace Orleans.Serialization.UnitTests.Buffers; + +[Trait("Category", "BVT")] +public sealed class ArcBufferWriterSerializationTests +{ + [Fact] + public void ArcBufferWriter_RoundTripsReaderWriterAcrossPages() + { + using var serviceProvider = new ServiceCollection() + .AddSerializer() + .BuildServiceProvider(); + var sessionPool = serviceProvider.GetRequiredService(); + var payload = new byte[ArcBufferWriter.MinimumPageSize + 17]; + for (var i = 0; i < payload.Length; i++) + { + payload[i] = (byte)i; + } + + using var bufferWriter = new ArcBufferWriter(); + using (var writerSession = sessionPool.GetSession()) + { + var writer = Writer.Create(bufferWriter, writerSession); + writer.WriteVarUInt32((uint)payload.Length); + writer.Write(payload); + writer.WriteUInt64(0xDEADBEEFUL); + writer.Commit(); + } + + using var buffer = bufferWriter.PeekSlice(bufferWriter.Length); + using var readerSession = sessionPool.GetSession(); + var reader = Reader.Create(buffer, readerSession); + Assert.Equal((uint)payload.Length, reader.ReadVarUInt32()); + Assert.Equal(payload, reader.ReadBytes((uint)payload.Length)); + Assert.Equal(0xDEADBEEFUL, reader.ReadUInt64()); + Assert.Equal(reader.Length, reader.Position); + } + + [Fact] + public void ArcBufferWriter_DeserializesThroughSerializer() + { + using var serviceProvider = new ServiceCollection() + .AddSerializer() + .BuildServiceProvider(); + var sessionPool = serviceProvider.GetRequiredService(); + var serializer = serviceProvider.GetRequiredService(); + var typedSerializer = serviceProvider.GetRequiredService>(); + var expected = new string('x', ArcBufferWriter.MinimumPageSize + 17); + + using var bufferWriter = new ArcBufferWriter(); + using (var writerSession = sessionPool.GetSession()) + { + var writer = Writer.Create(bufferWriter, writerSession); + serializer.Serialize(expected, ref writer); + writer.Commit(); + } + + using var buffer = bufferWriter.PeekSlice(bufferWriter.Length); + Assert.Equal(expected, serializer.Deserialize(buffer)); + Assert.Equal(expected, typedSerializer.Deserialize(buffer)); + } +}