Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions src/Persistence/EfCoreTests/EfCoreCompilationScenarios.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
using EfCoreTests.MultiTenancy;
using JasperFx.CodeGeneration;
using Microsoft.Extensions.DependencyInjection;
using NSubstitute;
using SharedPersistenceModels.Items;
using Wolverine.ComplianceTests;
using Wolverine;
using Wolverine.EntityFrameworkCore;

namespace EfCoreTests;

Expand All @@ -15,10 +17,12 @@ public async Task ef_context_is_scoped_and_options_are_scoped()
{
using var host = WolverineHost.For(opts =>
{
opts.CodeGeneration.TypeLoadMode = TypeLoadMode.Auto;
opts.Discovery.DisableConventionalDiscovery().IncludeType(typeof(CreateItemHandler));

// Default of both is scoped
opts.Services.AddDbContext<SampleDbContext>();

opts.UseEntityFrameworkCoreTransactions();
});

await host.MessageBus().InvokeAsync(new CreateItem { Name = "foo" });
Expand All @@ -29,8 +33,11 @@ public async Task ef_context_is_scoped_and_options_are_singleton()
{
var host = await WolverineHost.ForAsync(opts =>
{
opts.Discovery.DisableConventionalDiscovery().IncludeType(typeof(CreateItemHandler));
// Default of both is scoped
opts.Services.AddDbContext<SampleDbContext>(optionsLifetime: ServiceLifetime.Singleton);

opts.UseEntityFrameworkCoreTransactions();
});

await host.MessageBus().InvokeAsync(new CreateItem { Name = "foo" });
Expand All @@ -43,8 +50,12 @@ public async Task ef_context_is_singleton_and_options_are_singleton()
{
using var host = WolverineHost.For(opts =>
{
opts.Discovery.DisableConventionalDiscovery().IncludeType(typeof(CreateItemHandler));

// Default of both is scoped
opts.Services.AddDbContext<SampleDbContext>(ServiceLifetime.Singleton, ServiceLifetime.Singleton);

opts.UseEntityFrameworkCoreTransactions();
});

await host.MessageBus().InvokeAsync(new CreateItem { Name = "foo" });
Expand Down
Original file line number Diff line number Diff line change
@@ -1,24 +1,28 @@
// <auto-generated/>
#pragma warning disable
using Microsoft.Extensions.DependencyInjection;
using Wolverine.EntityFrameworkCore;

namespace Internal.Generated.WolverineHandlers
{
// START: CreateItemHandler1945924936
[global::System.CodeDom.Compiler.GeneratedCode("JasperFx", "1.0.0")]
public sealed class CreateItemHandler1945924936 : Wolverine.Runtime.Handlers.MessageHandler
{
private readonly Wolverine.EntityFrameworkCore.IDbContextOutboxFactory _dbContextOutboxFactory;
private readonly Microsoft.Extensions.DependencyInjection.IServiceScopeFactory _serviceScopeFactory;

public CreateItemHandler1945924936(Microsoft.Extensions.DependencyInjection.IServiceScopeFactory serviceScopeFactory)
public CreateItemHandler1945924936(Wolverine.EntityFrameworkCore.IDbContextOutboxFactory dbContextOutboxFactory, Microsoft.Extensions.DependencyInjection.IServiceScopeFactory serviceScopeFactory)
{
_dbContextOutboxFactory = dbContextOutboxFactory;
_serviceScopeFactory = serviceScopeFactory;
}



public override async System.Threading.Tasks.Task HandleAsync(Wolverine.Runtime.MessageContext context, System.Threading.CancellationToken cancellation)
{
var tenantIdentifier = new JasperFx.MultiTenancy.TenantId(context.TenantId);
using var serviceScope = _serviceScopeFactory.CreateScope();

/*
Expand All @@ -29,12 +33,16 @@ public override async System.Threading.Tasks.Task HandleAsync(Wolverine.Runtime.
// The actual message body
var createItem = (EfCoreTests.CreateItem)context.Envelope.Message;

System.Diagnostics.Activity.Current?.SetTag("message.handler", "EfCoreTests.CreateItemHandler");
var createItemHandler = new EfCoreTests.CreateItemHandler();
var myMessageHandler = new EfCoreTests.MultiTenancy.MyMessageHandler(_dbContextOutboxFactory);

// The actual message execution
await createItemHandler.Handle(createItem, sampleDbContext).ConfigureAwait(false);


// The actual message execution
await myMessageHandler.HandleAsync(createItem, tenantIdentifier, cancellation).ConfigureAwait(false);

}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,11 +198,11 @@ public MyMessageHandler(IDbContextOutboxFactory factory)
_factory = factory;
}

public async Task HandleAsync(CreateItem command, string tenantId, CancellationToken cancellationToken)
public async Task HandleAsync(CreateItem command, TenantId tenantId, CancellationToken cancellationToken)
{
// Get an EF Core DbContext wrapped in a Wolverine IDbContextOutbox<ItemsDbContext>
// for message sending wrapped in a transaction spanning the DbContext and Wolverine
var outbox = await _factory.CreateForTenantAsync<ItemsDbContext>(tenantId, cancellationToken);
var outbox = await _factory.CreateForTenantAsync<ItemsDbContext>(tenantId.Value, cancellationToken);
var item = new Item { Name = command.Name, Id = CombGuidIdGeneration.NewGuid() };

outbox.DbContext.Items.Add(item);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using Shouldly;
using Wolverine.ComplianceTests;
using Wolverine;
using Wolverine.Runtime.Agents;
using Wolverine.Transports;

namespace MartenTests.MultiTenancy;
Expand Down Expand Up @@ -36,6 +37,15 @@ public Envelope envelopeFor(string tenantId)

return envelope;
}

public Envelope envelopeFor(string tenantId, int ownerId)
{
var envelope = ObjectMother.Envelope();
envelope.TenantId = tenantId;
envelope.OwnerId = ownerId;

return envelope;
}

[Fact]
public async Task store_incoming_with_no_tenant()
Expand Down Expand Up @@ -393,6 +403,60 @@ public async Task fetch_counts()
counts.Incoming.ShouldBe(envelopes.Count);
counts.Outgoing.ShouldBe(19);
}

[Fact]
public async Task delete_node_really_does_release_ownership_across_tenants()
{
await Stores.Outbox.StoreOutgoingAsync(envelopeFor(null), 3);
await Stores.Outbox.StoreOutgoingAsync(envelopeFor(null), 3);
await Stores.Outbox.StoreOutgoingAsync(envelopeFor("tenant1"), 3);
await Stores.Outbox.StoreOutgoingAsync(envelopeFor("tenant1"), 3);
await Stores.Outbox.StoreOutgoingAsync(envelopeFor("tenant1"), 3);
await Stores.Outbox.StoreOutgoingAsync(envelopeFor("tenant2"), 3);
await Stores.Outbox.StoreOutgoingAsync(envelopeFor("tenant2"), 3);
await Stores.Outbox.StoreOutgoingAsync(envelopeFor("tenant2"), 3);
await Stores.Outbox.StoreOutgoingAsync(envelopeFor("tenant2"), 3);
await Stores.Outbox.StoreOutgoingAsync(envelopeFor("tenant2"), 3);
await Stores.Outbox.StoreOutgoingAsync(envelopeFor("tenant3"), 3);
await Stores.Outbox.StoreOutgoingAsync(envelopeFor("tenant3"), 3);
await Stores.Outbox.StoreOutgoingAsync(envelopeFor("tenant3"), 3);
await Stores.Outbox.StoreOutgoingAsync(envelopeFor("tenant3"), 3);
await Stores.Outbox.StoreOutgoingAsync(envelopeFor("tenant3"), 3);
await Stores.Outbox.StoreOutgoingAsync(envelopeFor("tenant3"), 3);
await Stores.Outbox.StoreOutgoingAsync(envelopeFor("tenant3"), 3);
await Stores.Outbox.StoreOutgoingAsync(envelopeFor("tenant3"), 3);
await Stores.Outbox.StoreOutgoingAsync(envelopeFor("tenant3"), 3);

await Stores.Inbox.StoreIncomingAsync(envelopeFor("tenant1", 3));
await Stores.Inbox.StoreIncomingAsync(envelopeFor("tenant1", 3));
await Stores.Inbox.StoreIncomingAsync(envelopeFor("tenant1", 3));
await Stores.Inbox.StoreIncomingAsync(envelopeFor("tenant1", 3));
await Stores.Inbox.StoreIncomingAsync(envelopeFor("tenant1", 3));
await Stores.Inbox.StoreIncomingAsync(envelopeFor("tenant2", 3));
await Stores.Inbox.StoreIncomingAsync(envelopeFor("tenant2", 3));
await Stores.Inbox.StoreIncomingAsync(envelopeFor("tenant2", 3));
await Stores.Inbox.StoreIncomingAsync(envelopeFor("tenant2", 3));
await Stores.Inbox.StoreIncomingAsync(envelopeFor("tenant2", 3));
await Stores.Inbox.StoreIncomingAsync(envelopeFor("tenant2", 3));
await Stores.Inbox.StoreIncomingAsync(envelopeFor("tenant3", 3));
await Stores.Inbox.StoreIncomingAsync(envelopeFor("tenant3", 3));
await Stores.Inbox.StoreIncomingAsync(envelopeFor("tenant3", 3));
await Stores.Inbox.StoreIncomingAsync(envelopeFor("tenant3", 3));
await Stores.Inbox.StoreIncomingAsync(envelopeFor("tenant3", 3));
await Stores.Inbox.StoreIncomingAsync(envelopeFor("tenant3", 3));


var wolverineNode = new WolverineNode { NodeId = Guid.NewGuid(), AssignedNodeNumber = 3 };
await Stores.Nodes.PersistAsync(wolverineNode,
CancellationToken.None);

(await Stores.Nodes.LoadNodeAsync(wolverineNode.NodeId, CancellationToken.None)).ShouldNotBeNull();

await Stores.Nodes.DeleteAsync(wolverineNode.NodeId, 3);

(await Stores.Admin.AllIncomingAsync()).Any(x => x.OwnerId == 3).ShouldBeFalse();
(await Stores.Admin.AllOutgoingAsync()).Any(x => x.OwnerId == 3).ShouldBeFalse();
}

[Fact]
public async Task release_ownership_smoke()
Expand Down
2 changes: 2 additions & 0 deletions src/Persistence/MartenTests/read_aggregate_attribute_usage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ public static LetterAggregateEnvelope Handle(FindAggregate command, [ReadAggrega
return new LetterAggregateEnvelope(aggregate);
}

/* ALTERNATIVE VERSION
[WolverineHandler]
public static LetterAggregateEnvelope Handle2(
FindAggregate command,
Expand All @@ -99,6 +100,7 @@ public static LetterAggregateEnvelope Handle2(
{
return aggregate == null ? null : new LetterAggregateEnvelope(aggregate);
}
*/
}

#endregion
83 changes: 81 additions & 2 deletions src/Wolverine/Persistence/Durability/MultiTenantedMessageStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public MultiTenantedMessageStore(IMessageStore main, IWolverineRuntime runtime,
public Type MarkerType => typeof(T);
}

public partial class MultiTenantedMessageStore : IMessageStore, IMessageInbox, IMessageOutbox, IMessageStoreAdmin, IDeadLetters, ISagaSupport
public partial class MultiTenantedMessageStore : IMessageStore, IMessageInbox, IMessageOutbox, IMessageStoreAdmin, IDeadLetters, ISagaSupport, INodeAgentPersistence
{
private readonly ILogger _logger;
private readonly RetryBlock<IEnvelopeCommand> _retryBlock;
Expand Down Expand Up @@ -320,7 +320,7 @@ public async Task InitializeAsync(IWolverineRuntime runtime)
public IMessageInbox Inbox => this;
public IMessageOutbox Outbox => this;
public IDeadLetters DeadLetters => this;
public INodeAgentPersistence Nodes => Main.Nodes;
public INodeAgentPersistence Nodes => this;
public IMessageStoreAdmin Admin => this;

public void Describe(TextWriter writer)
Expand Down Expand Up @@ -668,4 +668,83 @@ public async ValueTask<ISagaStorage<TId, TSaga>> EnrollAndFetchSagaStorage<TId,
throw new InvalidOperationException(
"The tenant stores do not implement ISagaSupport and cannot be used for saga persistence");
}

Task INodeAgentPersistence.ClearAllAsync(CancellationToken cancellationToken)
{
return Main.Nodes.ClearAllAsync(cancellationToken);
}

Task<int> INodeAgentPersistence.PersistAsync(WolverineNode node, CancellationToken cancellationToken)
{
return Main.Nodes.PersistAsync(node, cancellationToken);
}

async Task INodeAgentPersistence.DeleteAsync(Guid nodeId, int assignedNodeNumber)
{
await Main.Nodes.DeleteAsync(nodeId, assignedNodeNumber);
await executeOnAllAsync(async store =>
{
await store.Admin.ReleaseAllOwnershipAsync(assignedNodeNumber);
});
}

Task<IReadOnlyList<WolverineNode>> INodeAgentPersistence.LoadAllNodesAsync(CancellationToken cancellationToken)
{
return Main.Nodes.LoadAllNodesAsync(cancellationToken);
}

Task INodeAgentPersistence.AssignAgentsAsync(Guid nodeId, IReadOnlyList<Uri> agents, CancellationToken cancellationToken)
{
return Main.Nodes.AssignAgentsAsync(nodeId, agents, cancellationToken);
}

Task INodeAgentPersistence.RemoveAssignmentAsync(Guid nodeId, Uri agentUri, CancellationToken cancellationToken)
{
return Main.Nodes.RemoveAssignmentAsync(nodeId, agentUri, cancellationToken);
}

Task INodeAgentPersistence.AddAssignmentAsync(Guid nodeId, Uri agentUri, CancellationToken cancellationToken)
{
return Main.Nodes.AddAssignmentAsync(nodeId, agentUri, cancellationToken);
}

Task<WolverineNode?> INodeAgentPersistence.LoadNodeAsync(Guid nodeId, CancellationToken cancellationToken)
{
return Main.Nodes.LoadNodeAsync(nodeId, cancellationToken);
}

Task INodeAgentPersistence.MarkHealthCheckAsync(WolverineNode node, CancellationToken cancellationToken)
{
return Main.Nodes.MarkHealthCheckAsync(node, cancellationToken);
}

Task INodeAgentPersistence.OverwriteHealthCheckTimeAsync(Guid nodeId, DateTimeOffset lastHeartbeatTime)
{
return Main.Nodes.OverwriteHealthCheckTimeAsync(nodeId, lastHeartbeatTime);
}

Task INodeAgentPersistence.LogRecordsAsync(params NodeRecord[] records)
{
return Main.Nodes.LogRecordsAsync(records);
}

Task<IReadOnlyList<NodeRecord>> INodeAgentPersistence.FetchRecentRecordsAsync(int count)
{
return Main.Nodes.FetchRecentRecordsAsync(count);
}

bool INodeAgentPersistence.HasLeadershipLock()
{
return Main.Nodes.HasLeadershipLock();
}

Task<bool> INodeAgentPersistence.TryAttainLeadershipLockAsync(CancellationToken token)
{
return Main.Nodes.TryAttainLeadershipLockAsync(token);
}

Task INodeAgentPersistence.ReleaseLeadershipLockAsync()
{
return Main.Nodes.ReleaseLeadershipLockAsync();
}
}
4 changes: 0 additions & 4 deletions src/Wolverine/Persistence/EntityAttribute.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,6 @@ public override void GenerateCode(GeneratedMethod method, ISourceWriter writer)

_guardFrames[0].GenerateCode(method, writer);
}
else if (_creator.Next != null)
{
Debug.WriteLine("What the heck?");
}
else
{
var previous = _creator;
Expand Down
Loading