diff --git a/Directory.Build.props b/Directory.Build.props index cf05841fe1..c31363ae07 100644 --- a/Directory.Build.props +++ b/Directory.Build.props @@ -1,7 +1,7 @@ - 8.12.0 + 8.13.0 12.0 Jeremy D. Miller;Babu Annamalai;Jaedyn Tonee https://martendb.io/logo.png diff --git a/src/DaemonTests/MultiTenancy/using_for_tenant_with_side_effects_and_subscriptions.cs b/src/DaemonTests/MultiTenancy/using_for_tenant_with_side_effects_and_subscriptions.cs new file mode 100644 index 0000000000..ba2dcfe720 --- /dev/null +++ b/src/DaemonTests/MultiTenancy/using_for_tenant_with_side_effects_and_subscriptions.cs @@ -0,0 +1,107 @@ +using System; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using JasperFx.Core; +using JasperFx.Events; +using JasperFx.Events.Daemon; +using JasperFx.Events.Projections; +using Marten; +using Marten.Events.Projections; +using Marten.Storage; +using Marten.Subscriptions; +using Marten.Testing.Harness; +using Shouldly; +using Xunit; +using Xunit.Abstractions; + +namespace DaemonTests.MultiTenancy; + +public class using_for_tenant_with_side_effects_and_subscriptions : OneOffConfigurationsContext +{ + private readonly ITestOutputHelper _output; + + public using_for_tenant_with_side_effects_and_subscriptions(ITestOutputHelper output) + { + _output = output; + } + + [Fact] + public async Task try_to_append_with_for_tenant_in_projection() + { + StoreOptions(opts => + { + opts.Events.TenancyStyle = TenancyStyle.Conjoined; + opts.Projections.Add(new NumbersSubscription(), ProjectionLifecycle.Async); + opts.Projections.Errors.SkipApplyErrors = false; + opts.Logger(new TestOutputMartenLogger(_output)); + }); + + using var session = theStore.LightweightSession("green"); + session.Events.StartStream([new AEvent(), new BEvent(), new CEvent()]); + await session.SaveChangesAsync(); + + using var daemon = await theStore.BuildProjectionDaemonAsync(); + await daemon.StartAllAsync(); + await daemon.WaitForNonStaleData(5.Seconds()); + + var events = await theSession.Events.QueryAllRawEvents().Where(x => x.AnyTenant()).ToListAsync(); + events.Count.ShouldBe(6); + } + + [Fact] + public async Task try_to_append_with_for_tenant_in_subscription() + { + StoreOptions(opts => + { + opts.Events.TenancyStyle = TenancyStyle.Conjoined; + opts.Events.Subscribe(new NumberBatchSubscription()); + opts.Projections.Errors.SkipApplyErrors = false; + opts.Logger(new TestOutputMartenLogger(_output)); + }); + + using var session = theStore.LightweightSession("green"); + session.Events.StartStream([new AEvent(), new BEvent(), new CEvent()]); + await session.SaveChangesAsync(); + + using var daemon = await theStore.BuildProjectionDaemonAsync(); + await daemon.StartAllAsync(); + await daemon.WaitForNonStaleData(5.Seconds()); + + var events = await theSession.Events.QueryAllRawEvents().Where(x => x.AnyTenant()).ToListAsync(); + events.Count.ShouldBe(6); + } +} + + +public class NumbersSubscription: EventProjection +{ + public override ValueTask ApplyAsync(IDocumentOperations operations, IEvent e, CancellationToken cancellation) + { + if (e.TenantId != "blue") + { + operations.ForTenant("blue").Events.Append(e.StreamId, e.Data); + + } + + return ValueTask.CompletedTask; + } +} + +public class NumberBatchSubscription: ISubscription +{ + public Task ProcessEventsAsync(EventRange page, ISubscriptionController controller, IDocumentOperations operations, + CancellationToken cancellationToken) + { + foreach (var @event in page.Events) + { + if (@event.TenantId != "blue") + { + operations.ForTenant("blue").Events.Append(@event.StreamId, @event.Data); + + } + } + + return Task.FromResult(NullChangeListener.Instance); + } +} diff --git a/src/Marten/Events/AsyncProjectionTestingExtensions.cs b/src/Marten/Events/AsyncProjectionTestingExtensions.cs index 2dd972dc6d..824590905c 100644 --- a/src/Marten/Events/AsyncProjectionTestingExtensions.cs +++ b/src/Marten/Events/AsyncProjectionTestingExtensions.cs @@ -162,6 +162,10 @@ public static async Task WaitForNonStaleProjectionDataAsync(this IMartenDatabase private static string writeStatusMessage(IReadOnlyList projections) { + if (!projections.Any()) + return + "There is no recorded projection, subscription, or even high water mark activity detected. Is the daemon started correctly?"; + var grid = new Grid(); grid.AddColumn("Shard Name", x => x.ShardName); grid.AddColumn("Sequence", x => x.Sequence.ToString(), true); diff --git a/src/Marten/Events/Daemon/Internals/ProjectionUpdateBatch.cs b/src/Marten/Events/Daemon/Internals/ProjectionUpdateBatch.cs index 7a3b100d9c..6b5d32c0b6 100644 --- a/src/Marten/Events/Daemon/Internals/ProjectionUpdateBatch.cs +++ b/src/Marten/Events/Daemon/Internals/ProjectionUpdateBatch.cs @@ -32,7 +32,7 @@ public class ProjectionUpdateBatch: IUpdateBatch, IAsyncDisposable, IDisposable, private IMessageBatch? _batch; private OperationPage? _current; - private DocumentSessionBase? _session; + private DocumentSessionBase _session; internal ProjectionUpdateBatch(ProjectionOptions settings, DocumentSessionBase? session, ShardExecutionMode mode, CancellationToken token) @@ -179,7 +179,9 @@ void ISessionWorkTracker.Sort(StoreOptions options) throw new NotSupportedException(); } - List ISessionWorkTracker.Streams => new(); + private readonly List _streams = new(); + + List ISessionWorkTracker.Streams => _streams; IReadOnlyList ISessionWorkTracker.AllOperations => throw new NotSupportedException(); @@ -196,12 +198,18 @@ void ISessionWorkTracker.EjectAllOfType(Type type) bool ISessionWorkTracker.TryFindStream(string streamKey, out StreamAction stream) { - throw new NotSupportedException(); + stream = _streams + .FirstOrDefault(x => x.Key == streamKey); + + return stream != null; } bool ISessionWorkTracker.TryFindStream(Guid streamId, out StreamAction stream) { - throw new NotSupportedException(); + stream = _streams + .FirstOrDefault(x => x.Id == streamId); + + return stream != null; } bool ISessionWorkTracker.HasOutstandingWork() @@ -282,6 +290,16 @@ public async Task WaitForCompletion() await Queue.WaitForCompletionAsync().ConfigureAwait(false); foreach (var patch in _patches) applyOperation(patch); + + if (_streams.Any()) + { + var eventStorage = _session.EventStorage(); + foreach (var stream in _streams) + { + var op = eventStorage.QuickAppendEvents(stream); + applyOperation(op); + } + } } private void startNewPage(IMartenSession session) diff --git a/src/Marten/IDocumentOperations.cs b/src/Marten/IDocumentOperations.cs index 5eb631bc19..89850552d8 100644 --- a/src/Marten/IDocumentOperations.cs +++ b/src/Marten/IDocumentOperations.cs @@ -22,6 +22,15 @@ public interface IDocumentOperations: IQuerySession, IStorageOperations /// void Delete(T entity) where T : notnull; + /// + /// Access data from another tenant and apply document or event updates to this + /// IDocumentSession for a separate tenant + /// + /// + /// + new ITenantOperations ForTenant(string tenantId); + + /// /// Mark an entity of type T with either a numeric or Guid id for deletion upon the next call to SaveChanges() ///