Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ protected virtual void OnFinalRelease() // any required release semantics

internal abstract class CacheItem<T> : CacheItem
{
public abstract bool TryGetSize(out long size);

// attempt to get a value that was *not* previously reserved
public abstract bool TryGetValue(out T value);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ internal partial class DefaultHybridCache

private T _value = default!; // deferred until SetValue

public long Size { get; private set; } = -1;

public override bool DebugIsImmutable => true;

// get a shared instance that passes as "reserved"; doesn't need to be 100% singleton,
Expand All @@ -30,14 +32,24 @@ public static ImmutableCacheItem<T> GetReservedShared()
return obj;
}

public void SetValue(T value) => _value = value;
public void SetValue(T value, long size)
{
_value = value;
Size = size;
}

public override bool TryGetValue(out T value)
{
value = _value;
return true; // always available
}

public override bool TryGetSize(out long size)
{
size = Size;
return size >= 0;
}

public override bool TryReserveBuffer(out BufferChunk buffer)
{
buffer = default;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ internal ValueTask<BufferChunk> GetFromL2Async(string key, CancellationToken tok
return new(GetValidPayloadSegment(pendingLegacy.Result)); // already complete

case CacheFeatures.BackendCache | CacheFeatures.BackendBuffers: // IBufferWriter<byte>-based
var writer = RecyclableArrayBufferWriter<byte>.Create(MaximumPayloadBytes);
RecyclableArrayBufferWriter<byte> writer = RecyclableArrayBufferWriter<byte>.Create(MaximumPayloadBytes);
var cache = Unsafe.As<IBufferDistributedCache>(_backendCache!); // type-checked already
var pendingBuffers = cache.TryGetAsync(key, writer, token);
if (!pendingBuffers.IsCompletedSuccessfully)
Expand Down Expand Up @@ -95,13 +95,26 @@ internal void SetL1<T>(string key, CacheItem<T> value, HybridCacheEntryOptions?
if (value.TryReserve())
{
// based on CacheExtensions.Set<TItem>, but with post-eviction recycling
using var cacheEntry = _localCache.CreateEntry(key);

// intentionally use manual Dispose rather than "using"; confusingly, it is Dispose()
// that actually commits the add - so: if we fault, we don't want to try
// committing a partially configured cache entry
ICacheEntry cacheEntry = _localCache.CreateEntry(key);
cacheEntry.AbsoluteExpirationRelativeToNow = options?.LocalCacheExpiration ?? _defaultLocalCacheExpiration;
cacheEntry.Value = value;

if (value.TryGetSize(out var size))
{
cacheEntry = cacheEntry.SetSize(size);
}

if (value.NeedsEvictionCallback)
{
_ = cacheEntry.RegisterPostEvictionCallback(CacheItem.SharedOnEviction);
cacheEntry = cacheEntry.RegisterPostEvictionCallback(CacheItem.SharedOnEviction);
}

// commit
cacheEntry.Dispose();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,34 +21,38 @@ public void SetValue(ref BufferChunk buffer, IHybridCacheSerializer<T> serialize
buffer = default; // we're taking over the lifetime; the caller no longer has it!
}

public void SetValue(T value, IHybridCacheSerializer<T> serializer, int maxLength)
{
_serializer = serializer;
var writer = RecyclableArrayBufferWriter<byte>.Create(maxLength);
serializer.Serialize(value, writer);

_buffer = new(writer.DetachCommitted(out var length), length, returnToPool: true);
writer.Dispose(); // no buffers left (we just detached them), but just in case of other logic
}

public override bool TryGetValue(out T value)
{
// only if we haven't already burned
if (!TryReserve())
if (TryReserve())
{
value = default!;
return false;
try
{
value = _serializer.Deserialize(_buffer.AsSequence());
return true;
}
finally
{
_ = Release();
}
}

try
{
value = _serializer.Deserialize(_buffer.AsSequence());
return true;
}
finally
value = default!;
return false;
}

public override bool TryGetSize(out long size)
{
// only if we haven't already burned
if (TryReserve())
{
size = _buffer.Length;
_ = Release();
return true;
}

size = 0;
return false;
}

public override bool TryReserveBuffer(out BufferChunk buffer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@
using System.Diagnostics.CodeAnalysis;
using System.Threading;
using System.Threading.Tasks;
using static Microsoft.Extensions.Caching.Hybrid.Internal.DefaultHybridCache;

namespace Microsoft.Extensions.Caching.Hybrid.Internal;

internal partial class DefaultHybridCache
{
internal sealed class StampedeState<TState, T> : StampedeState
{
[DoesNotReturn]
private static CacheItem<T> ThrowUnexpectedCacheItem() => throw new InvalidOperationException("Unexpected cache item");
private const HybridCacheEntryFlags FlagsDisableL1AndL2 = HybridCacheEntryFlags.DisableLocalCacheWrite | HybridCacheEntryFlags.DisableDistributedCacheWrite;

private readonly TaskCompletionSource<CacheItem<T>>? _result;
private TState? _state;
Expand Down Expand Up @@ -154,6 +154,9 @@ static async Task<T> AwaitedAsync(Task<CacheItem<T>> task)
=> (await task.ConfigureAwait(false)).GetReservedValue();
}

[DoesNotReturn]
private static CacheItem<T> ThrowUnexpectedCacheItem() => throw new InvalidOperationException("Unexpected cache item");

[SuppressMessage("Resilience", "EA0014:The async method doesn't support cancellation", Justification = "In this case the cancellation token is provided internally via SharedToken")]
[SuppressMessage("Design", "CA1031:Do not catch general exception types", Justification = "Exception is passed through to faulted task result")]
private async Task BackgroundFetchAsync()
Expand All @@ -175,29 +178,72 @@ private async Task BackgroundFetchAsync()
// nothing from L2; invoke the underlying data store
if ((Key.Flags & HybridCacheEntryFlags.DisableUnderlyingData) == 0)
{
var cacheItem = SetResult(await _underlying!(_state!, SharedToken).ConfigureAwait(false));

// note that at this point we've already released most or all of the waiting callers; everything
// else here is background

// write to L2 if appropriate
if ((Key.Flags & HybridCacheEntryFlags.DisableDistributedCacheWrite) == 0)
// invoke the callback supplied by the caller
T newValue = await _underlying!(_state!, SharedToken).ConfigureAwait(false);

// If we're writing this value *anywhere*, we're going to need to serialize; this is obvious
// in the case of L2, but we also need it for L1, because MemoryCache might be enforcing
// SizeLimit (we can't know - it is an abstraction), and for *that* we need to know the item size.
// Likewise, if we're writing to a MutableCacheItem, we'll be serializing *anyway* for the payload.
//
// Rephrasing that: the only scenario in which we *do not* need to serialize is if:
// - it is an ImmutableCacheItem
// - we're writing neither to L1 nor L2

CacheItem cacheItem = CacheItem;
bool skipSerialize = cacheItem is ImmutableCacheItem<T> && (Key.Flags & FlagsDisableL1AndL2) == FlagsDisableL1AndL2;

if (skipSerialize)
{
if (cacheItem.TryReserveBuffer(out var buffer))
{
// mutable: we've already serialized it for the shared cache item
await Cache.SetL2Async(Key.Key, in buffer, _options, SharedToken).ConfigureAwait(false);
_ = cacheItem.Release(); // because we reserved
}
else if (cacheItem.TryGetValue(out var value))
SetImmutableResultWithoutSerialize(newValue);
}
else if (cacheItem.TryReserve())
{
// ^^^ The first thing we need to do is make sure we're not getting into a thread race over buffer disposal.
// In particular, if this cache item is somehow so short-lived that the buffers would be released *before* we're
// done writing them to L2, which happens *after* we've provided the value to consumers.
RecyclableArrayBufferWriter<byte> writer = RecyclableArrayBufferWriter<byte>.Create(MaximumPayloadBytes); // note this lifetime spans the SetL2Async
IHybridCacheSerializer<T> serializer = Cache.GetSerializer<T>();
serializer.Serialize(newValue, writer);
BufferChunk buffer = new(writer.DetachCommitted(out var length), length, returnToPool: true); // remove buffer ownership from the writer
writer.Dispose(); // we're done with the writer

// protect "buffer" (this is why we "reserved") for writing to L2 if needed; SetResultPreSerialized
// *may* (depending on context) claim this buffer, in which case "bufferToRelease" gets reset, and
// the final RecycleIfAppropriate() is a no-op; however, the buffer is valid in either event,
// (with TryReserve above guaranteeing that we aren't in a race condition).
BufferChunk bufferToRelease = buffer;

// and since "bufferToRelease" is the thing that will be returned at some point, we can make it explicit
// that we do not need or want "buffer" to do any recycling (they're the same memory)
buffer = buffer.DoNotReturnToPool();

// set the underlying result for this operation (includes L1 write if appropriate)
SetResultPreSerialized(newValue, ref bufferToRelease, serializer);

// Note that at this point we've already released most or all of the waiting callers. Everything
// from this point onwards happens in the background, from the perspective of the calling code.

// Write to L2 if appropriate.
if ((Key.Flags & HybridCacheEntryFlags.DisableDistributedCacheWrite) == 0)
{
// immutable: we'll need to do the serialize ourselves
var writer = RecyclableArrayBufferWriter<byte>.Create(MaximumPayloadBytes); // note this lifetime spans the SetL2Async
Cache.GetSerializer<T>().Serialize(value, writer);
buffer = new(writer.GetBuffer(out var length), length, returnToPool: false); // writer still owns the buffer
// We already have the payload serialized, so this is trivial to do.
await Cache.SetL2Async(Key.Key, in buffer, _options, SharedToken).ConfigureAwait(false);
writer.Dispose(); // recycle on success
}

// Release our hook on the CacheItem (only really important for "mutable").
_ = cacheItem.Release();

// Finally, recycle whatever was left over from SetResultPreSerialized; using "bufferToRelease"
// here is NOT a typo; if SetResultPreSerialized left this value alone (immutable), then
// this is our recycle step; if SetResultPreSerialized transferred ownership to the (mutable)
// CacheItem, then this becomes a no-op, and the buffer only gets fully recycled when the
// CacheItem itself is fully clear.
bufferToRelease.RecycleIfAppropriate();
}
else
{
throw new InvalidOperationException("Internal HybridCache failure: unable to reserve cache item to assign result");
}
}
else
Expand Down Expand Up @@ -237,13 +283,13 @@ private void SetResultAndRecycleIfAppropriate(ref BufferChunk value)
// set a result from L2 cache
Debug.Assert(value.Array is not null, "expected buffer");

var serializer = Cache.GetSerializer<T>();
IHybridCacheSerializer<T> serializer = Cache.GetSerializer<T>();
CacheItem<T> cacheItem;
switch (CacheItem)
{
case ImmutableCacheItem<T> immutable:
// deserialize; and store object; buffer can be recycled now
immutable.SetValue(serializer.Deserialize(new(value.Array!, 0, value.Length)));
immutable.SetValue(serializer.Deserialize(new(value.Array!, 0, value.Length)), value.Length);
value.RecycleIfAppropriate();
cacheItem = immutable;
break;
Expand All @@ -261,20 +307,43 @@ private void SetResultAndRecycleIfAppropriate(ref BufferChunk value)
SetResult(cacheItem);
}

private CacheItem<T> SetResult(T value)
private void SetImmutableResultWithoutSerialize(T value)
{
Debug.Assert((Key.Flags & FlagsDisableL1AndL2) == FlagsDisableL1AndL2, "Only expected if L1+L2 disabled");

// set a result from a value we calculated directly
CacheItem<T> cacheItem;
switch (CacheItem)
{
case ImmutableCacheItem<T> immutable:
// no serialize needed
immutable.SetValue(value);
immutable.SetValue(value, size: -1);
cacheItem = immutable;
break;
default:
cacheItem = ThrowUnexpectedCacheItem();
break;
}

SetResult(cacheItem);
}

private void SetResultPreSerialized(T value, ref BufferChunk buffer, IHybridCacheSerializer<T> serializer)
{
// set a result from a value we calculated directly that
// has ALREADY BEEN SERIALIZED (we can optionally consume this buffer)
CacheItem<T> cacheItem;
switch (CacheItem)
{
case ImmutableCacheItem<T> immutable:
// no serialize needed
immutable.SetValue(value, size: buffer.Length);
cacheItem = immutable;

// (but leave the buffer alone)
break;
case MutableCacheItem<T> mutable:
// serialization happens here
mutable.SetValue(value, Cache.GetSerializer<T>(), MaximumPayloadBytes);
mutable.SetValue(ref buffer, serializer);
mutable.DebugOnlyTrackBuffer(Cache);
cacheItem = mutable;
break;
Expand All @@ -284,7 +353,6 @@ private CacheItem<T> SetResult(T value)
}

SetResult(cacheItem);
return cacheItem;
}

private void SetResult(CacheItem<T> value)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ private static ServiceProvider GetDefaultCache(out DefaultHybridCache cache, Act
var services = new ServiceCollection();
config?.Invoke(services);
services.AddHybridCache();
var provider = services.BuildServiceProvider();
ServiceProvider provider = services.BuildServiceProvider();
cache = Assert.IsType<DefaultHybridCache>(provider.GetRequiredService<HybridCache>());
return provider;
}
Expand Down Expand Up @@ -117,8 +117,8 @@ private static bool Write(IBufferWriter<byte> destination, byte[]? buffer)
// prep the backend with our data
var key = Me();
Assert.NotNull(cache.BackendCache);
var serializer = cache.GetSerializer<Customer>();
using (var writer = RecyclableArrayBufferWriter<byte>.Create(int.MaxValue))
IHybridCacheSerializer<Customer> serializer = cache.GetSerializer<Customer>();
using (RecyclableArrayBufferWriter<byte> writer = RecyclableArrayBufferWriter<byte>.Create(int.MaxValue))
{
serializer.Serialize(await GetAsync(), writer);
cache.BackendCache.Set(key, writer.ToArray());
Expand Down Expand Up @@ -176,8 +176,8 @@ private static bool Write(IBufferWriter<byte> destination, byte[]? buffer)
// prep the backend with our data
var key = Me();
Assert.NotNull(cache.BackendCache);
var serializer = cache.GetSerializer<Customer>();
using (var writer = RecyclableArrayBufferWriter<byte>.Create(int.MaxValue))
IHybridCacheSerializer<Customer> serializer = cache.GetSerializer<Customer>();
using (RecyclableArrayBufferWriter<byte> writer = RecyclableArrayBufferWriter<byte>.Create(int.MaxValue))
{
serializer.Serialize(await GetAsync(), writer);
cache.BackendCache.Set(key, writer.ToArray());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ public async Task ReadOnlySequenceBufferRoundtrip(int size, SequenceKind kind)
Assert.Equal(size, expected.Length);
cache.Set(key, payload, _fiveMinutes);

var writer = RecyclableArrayBufferWriter<byte>.Create(int.MaxValue);
RecyclableArrayBufferWriter<byte> writer = RecyclableArrayBufferWriter<byte>.Create(int.MaxValue);
Assert.True(cache.TryGet(key, writer));
Assert.True(expected.Span.SequenceEqual(writer.GetCommittedMemory().Span));
writer.ResetInPlace();
Expand Down Expand Up @@ -247,7 +247,7 @@ public async Task ReadOnlySequenceBufferRoundtripAsync(int size, SequenceKind ki
Assert.Equal(size, expected.Length);
await cache.SetAsync(key, payload, _fiveMinutes);

var writer = RecyclableArrayBufferWriter<byte>.Create(int.MaxValue);
RecyclableArrayBufferWriter<byte> writer = RecyclableArrayBufferWriter<byte>.Create(int.MaxValue);
Assert.True(await cache.TryGetAsync(key, writer));
Assert.True(expected.Span.SequenceEqual(writer.GetCommittedMemory().Span));
writer.ResetInPlace();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ private static ServiceProvider GetDefaultCache(out DefaultHybridCache cache, Act
var services = new ServiceCollection();
config?.Invoke(services);
services.AddHybridCache();
var provider = services.BuildServiceProvider();
ServiceProvider provider = services.BuildServiceProvider();
cache = Assert.IsType<DefaultHybridCache>(provider.GetRequiredService<HybridCache>());
return provider;
}
Expand Down
Loading