Skip to content

Commit 2faa3f7

Browse files
committed
plug the L2 payload bits into the pipe
1 parent e5f0ee2 commit 2faa3f7

File tree

11 files changed

+139
-91
lines changed

11 files changed

+139
-91
lines changed

src/Libraries/Microsoft.Extensions.Caching.Hybrid/Internal/BufferChunk.cs

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,20 +15,23 @@ namespace Microsoft.Extensions.Caching.Hybrid.Internal;
1515
internal readonly struct BufferChunk
1616
{
1717
private const int FlagReturnToPool = (1 << 31);
18-
1918
private readonly int _lengthAndPoolFlag;
2019

21-
public byte[]? Array { get; } // null for default
20+
public byte[]? OversizedArray { get; } // null for default
21+
22+
public bool HasValue => OversizedArray is not null;
2223

24+
public int Offset { get; }
2325
public int Length => _lengthAndPoolFlag & ~FlagReturnToPool;
2426

2527
public bool ReturnToPool => (_lengthAndPoolFlag & FlagReturnToPool) != 0;
2628

2729
public BufferChunk(byte[] array)
2830
{
2931
Debug.Assert(array is not null, "expected valid array input");
30-
Array = array;
32+
OversizedArray = array;
3133
_lengthAndPoolFlag = array!.Length;
34+
Offset = 0;
3235

3336
// assume not pooled, if exact-sized
3437
// (we don't expect array.Length to be negative; we're really just saying
@@ -39,11 +42,12 @@ public BufferChunk(byte[] array)
3942
Debug.Assert(Length == array.Length, "array length not respected");
4043
}
4144

42-
public BufferChunk(byte[] array, int length, bool returnToPool)
45+
public BufferChunk(byte[] array, int offset, int length, bool returnToPool)
4346
{
4447
Debug.Assert(array is not null, "expected valid array input");
4548
Debug.Assert(length >= 0, "expected valid length");
46-
Array = array;
49+
OversizedArray = array;
50+
Offset = offset;
4751
_lengthAndPoolFlag = length | (returnToPool ? FlagReturnToPool : 0);
4852
Debug.Assert(ReturnToPool == returnToPool, "return-to-pool not respected");
4953
Debug.Assert(Length == length, "length not respected");
@@ -58,7 +62,7 @@ public byte[] ToArray()
5862
}
5963

6064
var copy = new byte[length];
61-
Buffer.BlockCopy(Array!, 0, copy, 0, length);
65+
Buffer.BlockCopy(OversizedArray!, Offset, copy, 0, length);
6266
return copy;
6367

6468
// Note on nullability of Array; the usage here is that a non-null array
@@ -73,17 +77,19 @@ internal void RecycleIfAppropriate()
7377
{
7478
if (ReturnToPool)
7579
{
76-
ArrayPool<byte>.Shared.Return(Array!);
80+
ArrayPool<byte>.Shared.Return(OversizedArray!);
7781
}
7882

7983
Unsafe.AsRef(in this) = default; // anti foot-shotgun double-return guard; not 100%, but worth doing
80-
Debug.Assert(Array is null && !ReturnToPool, "expected clean slate after recycle");
84+
Debug.Assert(OversizedArray is null && !ReturnToPool, "expected clean slate after recycle");
8185
}
8286

83-
internal ReadOnlySpan<byte> AsSpan() => Length == 0 ? default : new(Array!, 0, Length);
87+
internal ArraySegment<byte> AsArraySegment() => Length == 0 ? default! : new(OversizedArray!, Offset, Length);
88+
89+
internal ReadOnlySpan<byte> AsSpan() => Length == 0 ? default : new(OversizedArray!, Offset, Length);
8490

8591
// get the data as a ROS; for note on null-logic of Array!, see comment in ToArray
86-
internal ReadOnlySequence<byte> AsSequence() => Length == 0 ? default : new ReadOnlySequence<byte>(Array!, 0, Length);
92+
internal ReadOnlySequence<byte> AsSequence() => Length == 0 ? default : new ReadOnlySequence<byte>(OversizedArray!, Offset, Length);
8793

8894
internal BufferChunk DoNotReturnToPool()
8995
{

src/Libraries/Microsoft.Extensions.Caching.Hybrid/Internal/DefaultHybridCache.L2.cs

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,13 @@ internal partial class DefaultHybridCache
2020
private const string TagKeyPrefix = "__MSFT_HCT__";
2121
private static readonly DistributedCacheEntryOptions _tagInvalidationEntryOptions = new() { AbsoluteExpirationRelativeToNow = TimeSpan.FromDays(MaxCacheDays) };
2222

23+
private static readonly TimeSpan _defaultTimeout = TimeSpan.FromHours(1);
24+
2325
[SuppressMessage("Performance", "CA1849:Call async methods when in an async method", Justification = "Manual sync check")]
2426
[SuppressMessage("Usage", "VSTHRD003:Avoid awaiting foreign Tasks", Justification = "Manual sync check")]
2527
[SuppressMessage("Design", "CA1031:Do not catch general exception types", Justification = "Explicit async exception handling")]
2628
[SuppressMessage("Reliability", "CA2000:Dispose objects before losing scope", Justification = "Deliberate recycle only on success")]
27-
internal ValueTask<BufferChunk> GetFromL2Async(string key, CancellationToken token)
29+
internal ValueTask<BufferChunk> GetFromL2DirectAsync(string key, CancellationToken token)
2830
{
2931
switch (GetFeatures(CacheFeatures.BackendCache | CacheFeatures.BackendBuffers))
3032
{
@@ -54,7 +56,7 @@ internal ValueTask<BufferChunk> GetFromL2Async(string key, CancellationToken tok
5456
}
5557

5658
BufferChunk result = pendingBuffers.GetAwaiter().GetResult()
57-
? new(writer.DetachCommitted(out var length), length, returnToPool: true)
59+
? new(writer.DetachCommitted(out var length), 0, length, returnToPool: true)
5860
: default;
5961
writer.Dispose(); // it is not accidental that this isn't "using"; avoid recycling if not 100% sure what happened
6062
return new(result);
@@ -71,24 +73,24 @@ static async Task<BufferChunk> AwaitedLegacyAsync(Task<byte[]?> pending, Default
7173
static async Task<BufferChunk> AwaitedBuffersAsync(ValueTask<bool> pending, RecyclableArrayBufferWriter<byte> writer)
7274
{
7375
BufferChunk result = await pending.ConfigureAwait(false)
74-
? new(writer.DetachCommitted(out var length), length, returnToPool: true)
76+
? new(writer.DetachCommitted(out var length), 0, length, returnToPool: true)
7577
: default;
7678
writer.Dispose(); // it is not accidental that this isn't "using"; avoid recycling if not 100% sure what happened
7779
return result;
7880
}
7981
}
8082

81-
internal ValueTask SetL2Async(string key, in BufferChunk buffer, HybridCacheEntryOptions? options, CancellationToken token)
82-
=> HasBackendCache ? SetDirectL2Async(key, in buffer, GetOptions(options), token) : default;
83+
internal ValueTask SetL2Async(string key, CacheItem cacheItem, in BufferChunk buffer, HybridCacheEntryOptions? options, CancellationToken token)
84+
=> HasBackendCache ? WritePayloadAsync(key, cacheItem, buffer, options, token) : default;
8385

8486
internal ValueTask SetDirectL2Async(string key, in BufferChunk buffer, DistributedCacheEntryOptions options, CancellationToken token)
8587
{
86-
Debug.Assert(buffer.Array is not null, "array should be non-null");
88+
Debug.Assert(buffer.OversizedArray is not null, "array should be non-null");
8789
switch (GetFeatures(CacheFeatures.BackendCache | CacheFeatures.BackendBuffers))
8890
{
8991
case CacheFeatures.BackendCache: // legacy byte[]-based
90-
var arr = buffer.Array!;
91-
if (arr.Length != buffer.Length)
92+
var arr = buffer.OversizedArray!;
93+
if (buffer.Offset != 0 || arr.Length != buffer.Length)
9294
{
9395
// we'll need a right-sized snapshot
9496
arr = buffer.ToArray();
@@ -113,7 +115,7 @@ internal ValueTask InvalidateL2TagAsync(string tag, long timestamp, Cancellation
113115

114116
byte[] oversized = ArrayPool<byte>.Shared.Rent(sizeof(long));
115117
BinaryPrimitives.WriteInt64LittleEndian(oversized, timestamp);
116-
var pending = SetDirectL2Async(TagKeyPrefix + tag, new BufferChunk(oversized, sizeof(long), false), _tagInvalidationEntryOptions, token);
118+
var pending = SetDirectL2Async(TagKeyPrefix + tag, new BufferChunk(oversized, 0, sizeof(long), false), _tagInvalidationEntryOptions, token);
117119

118120
if (pending.IsCompletedSuccessfully)
119121
{
@@ -144,10 +146,10 @@ internal async Task<long> SafeReadTagInvalidationAsync(string tag)
144146
try
145147
{
146148
using var cts = new CancellationTokenSource(millisecondsDelay: READ_TIMEOUT);
147-
var buffer = await GetFromL2Async(TagKeyPrefix + tag, cts.Token).ConfigureAwait(false);
149+
var buffer = await GetFromL2DirectAsync(TagKeyPrefix + tag, cts.Token).ConfigureAwait(false);
148150

149151
long timestamp;
150-
if (buffer.Array is not null)
152+
if (buffer.OversizedArray is not null)
151153
{
152154
if (buffer.Length == sizeof(long))
153155
{
@@ -212,6 +214,20 @@ internal void SetL1<T>(string key, CacheItem<T> value, HybridCacheEntryOptions?
212214
}
213215
}
214216

217+
private async ValueTask WritePayloadAsync(string key, CacheItem cacheItem, BufferChunk payload, HybridCacheEntryOptions? options, CancellationToken token)
218+
{
219+
// bundle a serialized payload inside the wrapper used at the DC layer
220+
var maxLength = HybridCachePayload.GetMaxBytes(key, cacheItem.Tags, payload.Length);
221+
var oversized = ArrayPool<byte>.Shared.Rent(maxLength);
222+
223+
var length = HybridCachePayload.Write(oversized, key, cacheItem.CreationTimestamp, options?.Expiration ?? _defaultTimeout,
224+
HybridCachePayload.PayloadFlags.None, cacheItem.Tags, payload.AsSequence());
225+
226+
await SetDirectL2Async(key, new(oversized, 0, length, true), GetOptions(options), token).ConfigureAwait(false);
227+
228+
ArrayPool<byte>.Shared.Return(oversized);
229+
}
230+
215231
private BufferChunk GetValidPayloadSegment(byte[]? payload)
216232
{
217233
if (payload is not null)

src/Libraries/Microsoft.Extensions.Caching.Hybrid/Internal/DefaultHybridCache.StampedeStateT.cs

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -180,10 +180,10 @@ private async Task BackgroundFetchAsync()
180180
HybridCacheEventSource.Log.DistributedCacheGet();
181181
}
182182

183-
result = await Cache.GetFromL2Async(Key.Key, SharedToken).ConfigureAwait(false);
183+
result = await Cache.GetFromL2DirectAsync(Key.Key, SharedToken).ConfigureAwait(false);
184184
if (eventSourceEnabled)
185185
{
186-
if (result.Array is not null)
186+
if (result.HasValue)
187187
{
188188
HybridCacheEventSource.Log.DistributedCacheHit();
189189
}
@@ -213,10 +213,26 @@ private async Task BackgroundFetchAsync()
213213
result = default; // treat as "miss"
214214
}
215215

216-
if (result.Array is not null)
216+
if (result.HasValue)
217217
{
218-
SetResultAndRecycleIfAppropriate(ref result);
219-
return;
218+
// result is the wider payload including HC headers; unwrap it:
219+
switch (HybridCachePayload.TryParse(result.AsArraySegment(), Key.Key, CacheItem.Tags, Cache, out var payload,
220+
out var flags, out var entropy, out var pendingTags))
221+
{
222+
case HybridCachePayload.ParseResult.Success:
223+
// check any pending expirations, if necessary
224+
if (pendingTags.IsEmpty || !await Cache.IsAnyTagExpiredAsync(pendingTags, CacheItem.CreationTimestamp).ConfigureAwait(false))
225+
{
226+
// move into the payload segment (minus any framing/header/etc data)
227+
result = new(payload.Array!, payload.Offset, payload.Count, result.ReturnToPool);
228+
SetResultAndRecycleIfAppropriate(ref result);
229+
return;
230+
}
231+
232+
break;
233+
}
234+
235+
result.RecycleIfAppropriate();
220236
}
221237
}
222238

@@ -304,7 +320,7 @@ private async Task BackgroundFetchAsync()
304320
// We already have the payload serialized, so this is trivial to do.
305321
try
306322
{
307-
await Cache.SetL2Async(Key.Key, in buffer, _options, SharedToken).ConfigureAwait(false);
323+
await Cache.SetL2Async(Key.Key, cacheItem, in buffer, _options, SharedToken).ConfigureAwait(false);
308324

309325
if (eventSourceEnabled)
310326
{
@@ -377,15 +393,15 @@ private void SetDefaultResult()
377393
private void SetResultAndRecycleIfAppropriate(ref BufferChunk value)
378394
{
379395
// set a result from L2 cache
380-
Debug.Assert(value.Array is not null, "expected buffer");
396+
Debug.Assert(value.OversizedArray is not null, "expected buffer");
381397

382398
IHybridCacheSerializer<T> serializer = Cache.GetSerializer<T>();
383399
CacheItem<T> cacheItem;
384400
switch (CacheItem)
385401
{
386402
case ImmutableCacheItem<T> immutable:
387403
// deserialize; and store object; buffer can be recycled now
388-
immutable.SetValue(serializer.Deserialize(new(value.Array!, 0, value.Length)), value.Length);
404+
immutable.SetValue(serializer.Deserialize(new(value.OversizedArray!, value.Offset, value.Length)), value.Length);
389405
value.RecycleIfAppropriate();
390406
cacheItem = immutable;
391407
break;

src/Libraries/Microsoft.Extensions.Caching.Hybrid/Internal/DefaultHybridCache.TagInvalidation.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,6 @@ public bool IsTagExpired(string tag, long timestamp, out bool isPending)
137137
}
138138
}
139139

140-
141140
[System.Diagnostics.CodeAnalysis.SuppressMessage("Resilience", "EA0014:The async method doesn't support cancellation", Justification = "Ack")]
142141
public ValueTask<bool> IsAnyTagExpiredAsync(TagSet tags, long timestamp)
143142
{
@@ -165,6 +164,7 @@ static async ValueTask<bool> SlowAsync(DefaultHybridCache @this, TagSet tags, lo
165164

166165
[System.Diagnostics.CodeAnalysis.SuppressMessage("Resilience", "EA0014:The async method doesn't support cancellation", Justification = "Ack")]
167166
[System.Diagnostics.CodeAnalysis.SuppressMessage("Performance", "CA1849:Call async methods when in an async method", Justification = "Completion-checked")]
167+
[System.Diagnostics.CodeAnalysis.SuppressMessage("Usage", "VSTHRD003:Avoid awaiting foreign Tasks", Justification = "Manual async unwrap")]
168168
public ValueTask<bool> IsTagExpiredAsync(string tag, long timestamp)
169169
{
170170
if (!_tagInvalidationTimes.TryGetValue(tag, out var pending))

src/Libraries/Microsoft.Extensions.Caching.Hybrid/Internal/DefaultHybridCache.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
// The .NET Foundation licenses this file to you under the MIT license.
33

44
using System;
5-
using System.Collections.Concurrent;
65
using System.Collections.Generic;
76
using System.Diagnostics.CodeAnalysis;
87
using System.Linq;
Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
namespace Microsoft.Extensions.Caching.Hybrid.Internal;
1111

1212
// logic related to the payload that we send to IDistributedCache
13-
internal static class DistributedCachePayload
13+
internal static class HybridCachePayload
1414
{
1515
// FORMAT (v1):
1616
// fixed-size header (so that it can be reliably broadcast)
@@ -172,8 +172,8 @@ static void WriteString(byte[] target, ref int offset, string value)
172172
[System.Diagnostics.CodeAnalysis.SuppressMessage("StyleCop.CSharp.OrderingRules", "SA1204:Static elements should appear before instance elements", Justification = "False positive?")]
173173
[System.Diagnostics.CodeAnalysis.SuppressMessage("Major Code Smell", "S109:Magic numbers should not be used", Justification = "Encoding details; clear in context")]
174174
[System.Diagnostics.CodeAnalysis.SuppressMessage("Major Code Smell", "S107:Methods should not have too many parameters", Justification = "Borderline")]
175-
public static ParseResult TryParse(ReadOnlySpan<byte> bytes, string key, TagSet knownTags, DefaultHybridCache cache,
176-
out ReadOnlySpan<byte> payload, out PayloadFlags flags, out ushort entropy, out TagSet pendingTags)
175+
public static ParseResult TryParse(ArraySegment<byte> source, string key, TagSet knownTags, DefaultHybridCache cache,
176+
out ArraySegment<byte> payload, out PayloadFlags flags, out ushort entropy, out TagSet pendingTags)
177177
{
178178
// note "cache" is used primarily for expiration checks; we don't automatically add etc
179179
entropy = 0;
@@ -183,7 +183,7 @@ public static ParseResult TryParse(ReadOnlySpan<byte> bytes, string key, TagSet
183183
int pendingTagsCount = 0;
184184

185185
pendingTags = TagSet.Empty;
186-
186+
ReadOnlySpan<byte> bytes = new(source.Array!, source.Offset, source.Count);
187187
if (bytes.Length < 19) // minimum needed for empty payload and zero tags
188188
{
189189
return ParseResult.NotRecognized;
@@ -292,7 +292,8 @@ public static ParseResult TryParse(ReadOnlySpan<byte> bytes, string key, TagSet
292292
return ParseResult.InvalidData;
293293
}
294294

295-
payload = bytes.Slice(0, payloadLength);
295+
var start = source.Offset + source.Count - (payloadLength + 2);
296+
payload = new(source.Array!, start, payloadLength);
296297

297298
// finalize the pending tag buffer (in-flight tag expirations)
298299
switch (pendingTagsCount)

src/Libraries/Microsoft.Extensions.Caching.Hybrid/Internal/RecyclableArrayBufferWriter.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,8 @@ public Span<T> GetSpan(int sizeHint = 0)
131131
// create a standalone isolated copy of the buffer
132132
public T[] ToArray() => _buffer.AsSpan(0, _index).ToArray();
133133

134+
public ReadOnlySequence<T> AsSequence() => new(_buffer, 0, _index);
135+
134136
/// <summary>
135137
/// Disconnect the current buffer so that we can store it without it being recycled.
136138
/// </summary>

test/Libraries/Microsoft.Extensions.Caching.Hybrid.Tests/BufferReleaseTests.cs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,11 @@ private static bool Write(IBufferWriter<byte> destination, byte[]? buffer)
121121
using (RecyclableArrayBufferWriter<byte> writer = RecyclableArrayBufferWriter<byte>.Create(int.MaxValue))
122122
{
123123
serializer.Serialize(await GetAsync(), writer);
124-
cache.BackendCache.Set(key, writer.ToArray());
124+
125+
var arr = ArrayPool<byte>.Shared.Rent(HybridCachePayload.GetMaxBytes(key, TagSet.Empty, writer.CommittedBytes));
126+
var bytes = HybridCachePayload.Write(arr, key, cache.CurrentTimestamp(), TimeSpan.FromHours(1), 0, TagSet.Empty, writer.AsSequence());
127+
cache.BackendCache.Set(key, new ReadOnlySpan<byte>(arr, 0, bytes).ToArray());
128+
ArrayPool<byte>.Shared.Return(arr);
125129
}
126130
#if DEBUG
127131
cache.DebugOnlyGetOutstandingBuffers(flush: true);
@@ -180,7 +184,11 @@ private static bool Write(IBufferWriter<byte> destination, byte[]? buffer)
180184
using (RecyclableArrayBufferWriter<byte> writer = RecyclableArrayBufferWriter<byte>.Create(int.MaxValue))
181185
{
182186
serializer.Serialize(await GetAsync(), writer);
183-
cache.BackendCache.Set(key, writer.ToArray());
187+
188+
var arr = ArrayPool<byte>.Shared.Rent(HybridCachePayload.GetMaxBytes(key, TagSet.Empty, writer.CommittedBytes));
189+
var bytes = HybridCachePayload.Write(arr, key, cache.CurrentTimestamp(), TimeSpan.FromHours(1), 0, TagSet.Empty, writer.AsSequence());
190+
cache.BackendCache.Set(key, new ReadOnlySpan<byte>(arr, 0, bytes).ToArray());
191+
ArrayPool<byte>.Shared.Return(arr);
184192
}
185193
#if DEBUG
186194
cache.DebugOnlyGetOutstandingBuffers(flush: true);

0 commit comments

Comments
 (0)