diff --git a/AGENTS.md b/AGENTS.md index d030b130a9f..25d7bb60d18 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -1,6 +1,7 @@ # Repository workflow - Open pull requests against `dotnet/orleans`. +- Unless otherwise specified, create new branches from `main` in the upstream `dotnet/orleans` repository (`upstream/main`). - Do not push feature branches directly to `dotnet/orleans` or the `upstream` remote. - Push feature branches to your personal fork, which is configured as `origin`. - Create PRs using `dotnet/orleans` as the base repository and `origin`/personal fork branches as the head. @@ -8,6 +9,8 @@ Example: ```powershell +git fetch upstream main +git switch -c upstream/main git push --set-upstream origin gh pr create --repo dotnet/orleans --base main --head ReubenBond: ``` diff --git a/src/Orleans.Core/Messaging/Message.cs b/src/Orleans.Core/Messaging/Message.cs index 807555c397f..f327a284ff3 100644 --- a/src/Orleans.Core/Messaging/Message.cs +++ b/src/Orleans.Core/Messaging/Message.cs @@ -11,6 +11,7 @@ internal sealed class Message : ISpanFormattable, IMessageReceiverCache { public const int LENGTH_HEADER_SIZE = 8; public const int LENGTH_META_HEADER = 4; + internal const int MaxCacheInvalidationHeaderEntries = 16; [NonSerialized] private short _retryCount; @@ -282,15 +283,16 @@ public bool IsExpirableMessage() internal void AddToCacheInvalidationHeader(GrainAddress invalidAddress, GrainAddress? validAddress) { var grainAddressCacheUpdate = new GrainAddressCacheUpdate(invalidAddress, validAddress); - if (_cacheInvalidationHeader is null) + var cacheInvalidationHeader = _cacheInvalidationHeader; + if (cacheInvalidationHeader is null) { var newList = new List { grainAddressCacheUpdate }; - if (Interlocked.CompareExchange(ref _cacheInvalidationHeader, newList, null) is not null) + if (Interlocked.CompareExchange(ref _cacheInvalidationHeader, newList, null) is { } existingCacheInvalidationHeader) { // Another thread initialized it, add to the existing list - lock (_cacheInvalidationHeader) + lock (existingCacheInvalidationHeader) { - _cacheInvalidationHeader.Add(grainAddressCacheUpdate); + AddCacheInvalidationHeaderEntry(existingCacheInvalidationHeader, grainAddressCacheUpdate); } } else @@ -300,13 +302,36 @@ internal void AddToCacheInvalidationHeader(GrainAddress invalidAddress, GrainAdd } else { - lock (_cacheInvalidationHeader) + lock (cacheInvalidationHeader) { - _cacheInvalidationHeader.Add(grainAddressCacheUpdate); + AddCacheInvalidationHeaderEntry(cacheInvalidationHeader, grainAddressCacheUpdate); } } } + private static void AddCacheInvalidationHeaderEntry(List cacheInvalidationHeader, GrainAddressCacheUpdate grainAddressCacheUpdate) + { + if (cacheInvalidationHeader.Count >= MaxCacheInvalidationHeaderEntries || ContainsCacheInvalidationHeaderEntry(cacheInvalidationHeader, grainAddressCacheUpdate.GrainId)) + { + return; + } + + cacheInvalidationHeader.Add(grainAddressCacheUpdate); + } + + private static bool ContainsCacheInvalidationHeaderEntry(List cacheInvalidationHeader, GrainId grainId) + { + foreach (var entry in cacheInvalidationHeader) + { + if (entry.GrainId.Equals(grainId)) + { + return true; + } + } + + return false; + } + public override string ToString() => $"{this}"; string IFormattable.ToString(string? format, IFormatProvider? formatProvider) => ToString(); diff --git a/test/Orleans.Core.Tests/Messaging/MessageTests.cs b/test/Orleans.Core.Tests/Messaging/MessageTests.cs new file mode 100644 index 00000000000..731c1db5654 --- /dev/null +++ b/test/Orleans.Core.Tests/Messaging/MessageTests.cs @@ -0,0 +1,52 @@ +using System.Net; +using Orleans.Runtime; +using TestExtensions; +using Xunit; + +namespace UnitTests.Messaging; + +public class MessageTests +{ + [Fact, TestCategory("BVT")] + public void AddToCacheInvalidationHeader_LimitsHeaderLength() + { + var message = new Message(); + + for (var i = 0; i < Message.MaxCacheInvalidationHeaderEntries + 1; i++) + { + var grainId = GrainId.Create("test", i.ToString()); + message.AddToCacheInvalidationHeader(CreateAddress(grainId, i), CreateAddress(grainId, i + 100)); + } + + var header = message.CacheInvalidationHeader; + Assert.NotNull(header); + Assert.Equal(Message.MaxCacheInvalidationHeaderEntries, header.Count); + Assert.DoesNotContain(header, update => update.GrainId.Equals(GrainId.Create("test", Message.MaxCacheInvalidationHeaderEntries.ToString()))); + } + + [Fact, TestCategory("BVT")] + public void AddToCacheInvalidationHeader_DeduplicatesByGrainId() + { + var message = new Message(); + var grainId = GrainId.Create("test", "duplicate"); + var invalidAddress = CreateAddress(grainId, 1); + var validAddress = CreateAddress(grainId, 2); + + message.AddToCacheInvalidationHeader(invalidAddress, validAddress); + message.AddToCacheInvalidationHeader(CreateAddress(grainId, 3), CreateAddress(grainId, 4)); + + var header = message.CacheInvalidationHeader; + Assert.NotNull(header); + var update = Assert.Single(header); + Assert.Equal(grainId, update.GrainId); + Assert.Equal(invalidAddress, update.InvalidGrainAddress); + Assert.Equal(validAddress, update.ValidGrainAddress); + } + + private static GrainAddress CreateAddress(GrainId grainId, int offset) => new() + { + GrainId = grainId, + ActivationId = ActivationId.NewId(), + SiloAddress = SiloAddress.New(IPAddress.Loopback, 10_000 + offset, offset + 1) + }; +}