Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
# Repository workflow

- Open pull requests against `dotnet/orleans`.
- Unless otherwise specified, create new branches from `main` in the upstream `dotnet/orleans` repository (`upstream/main`).
- Do not push feature branches directly to `dotnet/orleans` or the `upstream` remote.
- Push feature branches to your personal fork, which is configured as `origin`.
- Create PRs using `dotnet/orleans` as the base repository and `origin`/personal fork branches as the head.

Example:

```powershell
git fetch upstream main
git switch -c <branch> upstream/main
git push --set-upstream origin <branch>
gh pr create --repo dotnet/orleans --base main --head ReubenBond:<branch>
```
37 changes: 31 additions & 6 deletions src/Orleans.Core/Messaging/Message.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ internal sealed class Message : ISpanFormattable, IMessageReceiverCache
{
public const int LENGTH_HEADER_SIZE = 8;
public const int LENGTH_META_HEADER = 4;
internal const int MaxCacheInvalidationHeaderEntries = 16;

[NonSerialized]
private short _retryCount;
Expand Down Expand Up @@ -282,15 +283,16 @@ public bool IsExpirableMessage()
internal void AddToCacheInvalidationHeader(GrainAddress invalidAddress, GrainAddress? validAddress)
{
var grainAddressCacheUpdate = new GrainAddressCacheUpdate(invalidAddress, validAddress);
if (_cacheInvalidationHeader is null)
var cacheInvalidationHeader = _cacheInvalidationHeader;
if (cacheInvalidationHeader is null)
{
var newList = new List<GrainAddressCacheUpdate> { grainAddressCacheUpdate };
if (Interlocked.CompareExchange(ref _cacheInvalidationHeader, newList, null) is not null)
if (Interlocked.CompareExchange(ref _cacheInvalidationHeader, newList, null) is { } existingCacheInvalidationHeader)
{
// Another thread initialized it, add to the existing list
lock (_cacheInvalidationHeader)
lock (existingCacheInvalidationHeader)
{
_cacheInvalidationHeader.Add(grainAddressCacheUpdate);
AddCacheInvalidationHeaderEntry(existingCacheInvalidationHeader, grainAddressCacheUpdate);
}
}
else
Expand All @@ -300,13 +302,36 @@ internal void AddToCacheInvalidationHeader(GrainAddress invalidAddress, GrainAdd
}
else
{
lock (_cacheInvalidationHeader)
lock (cacheInvalidationHeader)
{
_cacheInvalidationHeader.Add(grainAddressCacheUpdate);
AddCacheInvalidationHeaderEntry(cacheInvalidationHeader, grainAddressCacheUpdate);
}
}
}

private static void AddCacheInvalidationHeaderEntry(List<GrainAddressCacheUpdate> cacheInvalidationHeader, GrainAddressCacheUpdate grainAddressCacheUpdate)
{
if (cacheInvalidationHeader.Count >= MaxCacheInvalidationHeaderEntries || ContainsCacheInvalidationHeaderEntry(cacheInvalidationHeader, grainAddressCacheUpdate.GrainId))
{
return;
}

cacheInvalidationHeader.Add(grainAddressCacheUpdate);
}

private static bool ContainsCacheInvalidationHeaderEntry(List<GrainAddressCacheUpdate> cacheInvalidationHeader, GrainId grainId)
{
foreach (var entry in cacheInvalidationHeader)
{
if (entry.GrainId.Equals(grainId))
{
return true;
}
}

return false;
}

public override string ToString() => $"{this}";

string IFormattable.ToString(string? format, IFormatProvider? formatProvider) => ToString();
Expand Down
52 changes: 52 additions & 0 deletions test/Orleans.Core.Tests/Messaging/MessageTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
using System.Net;
using Orleans.Runtime;
using TestExtensions;
using Xunit;

namespace UnitTests.Messaging;

public class MessageTests
{
[Fact, TestCategory("BVT")]
public void AddToCacheInvalidationHeader_LimitsHeaderLength()
{
var message = new Message();

for (var i = 0; i < Message.MaxCacheInvalidationHeaderEntries + 1; i++)
{
var grainId = GrainId.Create("test", i.ToString());
message.AddToCacheInvalidationHeader(CreateAddress(grainId, i), CreateAddress(grainId, i + 100));
}

var header = message.CacheInvalidationHeader;
Assert.NotNull(header);
Assert.Equal(Message.MaxCacheInvalidationHeaderEntries, header.Count);
Assert.DoesNotContain(header, update => update.GrainId.Equals(GrainId.Create("test", Message.MaxCacheInvalidationHeaderEntries.ToString())));
}

[Fact, TestCategory("BVT")]
public void AddToCacheInvalidationHeader_DeduplicatesByGrainId()
{
var message = new Message();
var grainId = GrainId.Create("test", "duplicate");
var invalidAddress = CreateAddress(grainId, 1);
var validAddress = CreateAddress(grainId, 2);

message.AddToCacheInvalidationHeader(invalidAddress, validAddress);
message.AddToCacheInvalidationHeader(CreateAddress(grainId, 3), CreateAddress(grainId, 4));

var header = message.CacheInvalidationHeader;
Assert.NotNull(header);
var update = Assert.Single(header);
Assert.Equal(grainId, update.GrainId);
Assert.Equal(invalidAddress, update.InvalidGrainAddress);
Assert.Equal(validAddress, update.ValidGrainAddress);
}

private static GrainAddress CreateAddress(GrainId grainId, int offset) => new()
{
GrainId = grainId,
ActivationId = ActivationId.NewId(),
SiloAddress = SiloAddress.New(IPAddress.Loopback, 10_000 + offset, offset + 1)
};
}
Loading