Skip to content

Commit cef1b1c

Browse files
committed
SpanReader支持从数据流中持续读取数据来扩容读写器,以支持redis/mysql等部分数据帧过大的协议。
1 parent 1d99265 commit cef1b1c

File tree

5 files changed

+175
-72
lines changed

5 files changed

+175
-72
lines changed

NewLife.Core/Buffers/SpanReader.cs

+112-4
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,14 @@
77
namespace NewLife.Buffers;
88

99
/// <summary>Span读取器</summary>
10+
/// <remarks>
11+
/// 引用结构的Span读取器确保高性能无GC读取。
12+
/// 支持Stream扩展,当数据不足时,自动从数据流中读取,常用于解析Redis/MySql等协议。
13+
/// </remarks>
1014
public ref struct SpanReader
1115
{
1216
#region 属性
13-
private readonly ReadOnlySpan<Byte> _span;
17+
private ReadOnlySpan<Byte> _span;
1418
/// <summary>数据片段</summary>
1519
public ReadOnlySpan<Byte> Span => _span;
1620

@@ -39,7 +43,43 @@ public ref struct SpanReader
3943

4044
/// <summary>实例化Span读取器</summary>
4145
/// <param name="data"></param>
42-
public SpanReader(IPacket data) : this(data.GetSpan()) { }
46+
public SpanReader(IPacket data)
47+
{
48+
_data = data;
49+
_span = data.GetSpan();
50+
_total = data.Total;
51+
}
52+
#endregion
53+
54+
#region 扩容增强
55+
/// <summary>最大容量。多次从数据流读取数据时,受限于此最大值</summary>
56+
public Int32 MaxCapacity { get; set; }
57+
58+
private readonly Stream? _stream;
59+
private readonly Int32 _bufferSize;
60+
private IPacket? _data;
61+
private Int32 _total;
62+
63+
/// <summary>实例化Span读取器。支持从数据流中读取更多数据,突破大小限制</summary>
64+
/// <remarks>
65+
/// 解析网络协议时,有时候数据帧较大,超过特定缓冲区大小,导致无法一次性读取完整数据帧。
66+
/// 加入数据流参数后,在读取数据不足时,SpanReader会自动从数据流中读取一批数据。
67+
/// </remarks>
68+
/// <param name="stream">数据流。一般是网络流</param>
69+
/// <param name="data"></param>
70+
/// <param name="bufferSize"></param>
71+
public SpanReader(Stream stream, IPacket? data = null, Int32 bufferSize = 8192)
72+
{
73+
_stream = stream;
74+
_bufferSize = bufferSize;
75+
76+
if (data != null)
77+
{
78+
_data = data;
79+
_span = data.GetSpan();
80+
_total = data.Total;
81+
}
82+
}
4383
#endregion
4484

4585
#region 基础方法
@@ -69,8 +109,49 @@ public ReadOnlySpan<Byte> GetSpan(Int32 sizeHint = 0)
69109
/// <summary>确保缓冲区中有足够的空间。</summary>
70110
/// <param name="size">需要的字节数。</param>
71111
/// <exception cref="InvalidOperationException"></exception>
72-
private void EnsureSpace(Int32 size)
112+
public void EnsureSpace(Int32 size)
73113
{
114+
// 检查剩余空间大小,不足时,再从数据流中读取。此时需要注意,创建新的OwnerPacket后,需要先把之前剩余的一点数据拷贝过去,然后再读取Stream
115+
var remain = FreeCapacity;
116+
if (remain < size && _stream != null)
117+
{
118+
// 申请指定大小的数据包缓冲区,至少达到缓冲区大小,但不超过最大容量
119+
var idx = 0;
120+
var bsize = size;
121+
if (bsize < _bufferSize) bsize = _bufferSize;
122+
if (MaxCapacity > 0 && bsize > MaxCapacity - _total) bsize = MaxCapacity - _total;
123+
var pk = new OwnerPacket(bsize);
124+
if (_data != null && remain > 0)
125+
{
126+
if (!_data.TryGetArray(out var arr)) throw new NotSupportedException();
127+
128+
arr.AsSpan(_index, remain).CopyTo(pk.Buffer);
129+
idx += remain;
130+
}
131+
132+
_data.TryDispose();
133+
_data = pk;
134+
_index = 0;
135+
136+
// 多次读取,直到满足需求
137+
//var n = _stream.ReadExactly(pk.Buffer, pk.Offset + idx, pk.Length - idx);
138+
while (idx < size)
139+
{
140+
// 实际缓冲区大小可能大于申请大小,充分利用缓冲区,避免多次读取
141+
var len = pk.Buffer.Length - pk.Offset;
142+
var n = _stream.Read(pk.Buffer, pk.Offset + idx, len - idx);
143+
if (n <= 0) break;
144+
145+
idx += n;
146+
}
147+
if (idx < size)
148+
throw new InvalidOperationException("Not enough data to read.");
149+
pk.Resize(idx);
150+
151+
_span = pk.GetSpan();
152+
_total += idx - remain;
153+
}
154+
74155
if (_index + size > _span.Length)
75156
throw new InvalidOperationException("Not enough data to read.");
76157
}
@@ -213,7 +294,6 @@ public String ReadString(Int32 length = 0, Encoding? encoding = null)
213294
/// <summary>读取字节数组</summary>
214295
/// <param name="length"></param>
215296
/// <returns></returns>
216-
/// <exception cref="InvalidOperationException"></exception>
217297
public ReadOnlySpan<Byte> ReadBytes(Int32 length)
218298
{
219299
EnsureSpace(length);
@@ -223,6 +303,34 @@ public ReadOnlySpan<Byte> ReadBytes(Int32 length)
223303
return result;
224304
}
225305

306+
/// <summary>读取字节数组</summary>
307+
/// <param name="data"></param>
308+
/// <returns></returns>
309+
public Int32 Read(Span<Byte> data)
310+
{
311+
var length = data.Length;
312+
EnsureSpace(length);
313+
314+
var result = _span.Slice(_index, length);
315+
result.CopyTo(data);
316+
_index += length;
317+
return length;
318+
}
319+
320+
/// <summary>读取数据包。直接对内部数据包进行切片</summary>
321+
/// <param name="length"></param>
322+
/// <returns></returns>
323+
public IPacket ReadPacket(Int32 length)
324+
{
325+
if (_data == null) throw new InvalidOperationException("No data stream to read!");
326+
327+
//EnsureSpace(length);
328+
329+
var result = _data.Slice(_index, length);
330+
_index += length;
331+
return result;
332+
}
333+
226334
/// <summary>读取结构体</summary>
227335
/// <typeparam name="T"></typeparam>
228336
/// <returns></returns>

NewLife.Core/Http/WebSocketMessage.cs

+5-13
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
using System.Buffers.Binary;
2-
using System.Drawing;
32
using System.Text;
43
using NewLife.Buffers;
54
using NewLife.Data;
@@ -64,11 +63,7 @@ public Boolean Read(IPacket pk)
6463
{
6564
if (pk.Length < 2) return false;
6665

67-
var reader = new SpanReader(pk.GetSpan())
68-
{
69-
IsLittleEndian = false
70-
};
71-
//var reader = pk.GetStream();
66+
var reader = new SpanReader(pk) { IsLittleEndian = false };
7267
var b = reader.ReadByte();
7368

7469
Type = (WebSocketMessageType)(b & 0x7F);
@@ -96,18 +91,18 @@ public Boolean Read(IPacket pk)
9691
// 如果mask,剩下的就是数据,避免拷贝,提升性能
9792
if (!mask)
9893
{
99-
Payload = pk.Slice(reader.Position, (Int32)len, true);
94+
Payload = reader.ReadPacket((Int32)len);
10095
}
10196
else
10297
{
10398
var masks = new Byte[4];
104-
if (mask) reader.ReadBytes(4).CopyTo(masks);
99+
if (mask) reader.Read(masks);
105100
MaskKey = masks;
106101

107102
if (mask)
108103
{
109104
// 直接在数据缓冲区修改,避免拷贝
110-
Payload = pk.Slice(reader.Position, (Int32)len, true);
105+
Payload = reader.ReadPacket((Int32)len);
111106
var data = Payload.GetSpan();
112107
for (var i = 0; i < len; i++)
113108
{
@@ -152,10 +147,7 @@ public virtual IPacket ToPacket()
152147
if (Type == WebSocketMessageType.Close) size += len;
153148

154149
var rs = body.ExpandHeader(size);
155-
var writer = new SpanWriter(rs.GetSpan())
156-
{
157-
IsLittleEndian = false
158-
};
150+
var writer = new SpanWriter(rs) { IsLittleEndian = false };
159151

160152
writer.WriteByte((Byte)(0x80 | (Byte)Type));
161153

NewLife.Core/Messaging/DefaultMessage.cs

-2
Original file line numberDiff line numberDiff line change
@@ -176,14 +176,12 @@ public static Int32 GetLength(ReadOnlySpan<Byte> span)
176176
reader.Advance(2);
177177

178178
// 小于64k,直接返回
179-
//var len = pk.Data.ToUInt16(pk.Offset + 2);
180179
var len = reader.ReadUInt16();
181180
if (len < 0xFFFF) return 4 + len;
182181

183182
// 超过64k的超大数据包,再来4个字节
184183
if (span.Length < 8) return 0;
185184

186-
//return 8 + (Int32)pk.Data.ToUInt32(pk.Offset + 2 + 2);
187185
return 8 + reader.ReadInt32();
188186
}
189187

NewLife.Core/Net/Handlers/MessageCodec.cs

+2-49
Original file line numberDiff line numberDiff line change
@@ -185,54 +185,7 @@ public override Boolean Close(IHandlerContext context, String reason)
185185
/// <param name="offset">长度的偏移量</param>
186186
/// <param name="size">长度大小。0变长,1/2/4小端字节,-2/-4大端字节</param>
187187
/// <returns>数据帧长度(包含头部长度位)</returns>
188-
public static Int32 GetLength(IPacket pk, Int32 offset, Int32 size)
189-
{
190-
if (offset < 0) return pk.Total;
191-
192-
// 数据不够,连长度都读取不了
193-
if (offset >= pk.Total) return 0;
194-
195-
var reader = new SpanReader(pk.GetSpan());
196-
reader.Advance(offset);
197-
198-
// 读取大小
199-
var len = 0;
200-
switch (size)
201-
{
202-
case 0:
203-
// 计算变长的头部长度
204-
var p = reader.Position;
205-
len = reader.ReadEncodedInt() + reader.Position - p;
206-
break;
207-
case 1:
208-
len = reader.ReadByte();
209-
break;
210-
case 2:
211-
len = reader.ReadUInt16();
212-
break;
213-
case 4:
214-
len = reader.ReadInt32();
215-
break;
216-
case -2:
217-
reader.IsLittleEndian = false;
218-
len = reader.ReadUInt16();
219-
break;
220-
case -4:
221-
reader.IsLittleEndian = false;
222-
len = reader.ReadInt32();
223-
break;
224-
default:
225-
throw new NotSupportedException();
226-
}
227-
228-
// 判断后续数据是否足够
229-
if (len > pk.Total) return 0;
230-
231-
// 数据长度加上头部长度
232-
len += Math.Abs(size);
233-
234-
return offset + len;
235-
}
188+
public static Int32 GetLength(IPacket pk, Int32 offset, Int32 size) => GetLength(pk.GetSpan(), offset, size);
236189

237190
/// <summary>从数据流中获取整帧数据长度</summary>
238191
/// <param name="span">数据包</param>
@@ -246,7 +199,7 @@ public static Int32 GetLength(ReadOnlySpan<Byte> span, Int32 offset, Int32 size)
246199
// 数据不够,连长度都读取不了
247200
if (offset >= span.Length) return 0;
248201

249-
var reader = new SpanReader(span);
202+
var reader = new SpanReader(span) { IsLittleEndian = true };
250203
reader.Advance(offset);
251204

252205
// 读取大小

XUnitTest.Core/Buffers/SpanReaderTests.cs

+56-4
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
1-
using System;
2-
using System.Text;
3-
using NewLife;
1+
using System.Text;
42
using NewLife.Buffers;
3+
using NewLife.Data;
54
using NewLife.Security;
65
using Xunit;
76

@@ -161,9 +160,62 @@ public void ReadBytesTest()
161160
{
162161
var data = new Byte[] { 1, 2, 3, 4, 5 };
163162
var reader = new SpanReader(data);
163+
reader.Advance(1);
164164

165165
var result = reader.ReadBytes(3);
166-
Assert.Equal(new Byte[] { 1, 2, 3 }, result.ToArray());
166+
Assert.Equal(new Byte[] { 2, 3, 4 }, result.ToArray());
167+
}
168+
169+
[Fact]
170+
public void ReadTest()
171+
{
172+
var data = new Byte[] { 1, 2, 3, 4, 5 };
173+
var reader = new SpanReader(data);
174+
reader.Advance(1);
175+
176+
var buf = new Byte[3];
177+
var result = reader.Read(buf);
178+
Assert.Equal(new Byte[] { 2, 3, 4 }, buf);
179+
}
180+
181+
[Fact]
182+
public void ReadPacket()
183+
{
184+
var data = new Byte[] { 1, 2, 3, 4, 5 };
185+
var reader = new SpanReader(data);
186+
reader.Advance(1);
187+
188+
//Assert.Throws<ArgumentOutOfRangeException>(() => reader.ReadPacket(3));
189+
try
190+
{
191+
reader.ReadPacket(3);
192+
}
193+
catch (Exception ex)
194+
{
195+
196+
Assert.NotNull(ex as InvalidOperationException);
197+
}
198+
199+
var pk = new ArrayPacket(data);
200+
reader = new SpanReader(pk);
201+
reader.Advance(1);
202+
203+
var result = reader.ReadPacket(3);
204+
Assert.Equal(new Byte[] { 2, 3, 4 }, result.ToArray());
205+
Assert.True(data == ((ArrayPacket)result).Buffer);
206+
}
207+
208+
[Fact]
209+
public void StreamReadTest()
210+
{
211+
var data = new Byte[] { 1, 2, 3, 4, 5 };
212+
var ms = new MemoryStream(data);
213+
var reader = new SpanReader(ms);
214+
//reader.Advance(1);
215+
reader.ReadByte();
216+
217+
var result = reader.ReadBytes(3);
218+
Assert.Equal(new Byte[] { 2, 3, 4 }, result.ToArray());
167219
}
168220

169221
[Fact]

0 commit comments

Comments
 (0)