Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Directory.Build.props
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<?xml version="1.0" encoding="utf-8"?>
<Project>
<PropertyGroup>
<Version>8.12.0</Version>
<Version>8.13.0</Version>
<LangVersion>12.0</LangVersion>
<Authors>Jeremy D. Miller;Babu Annamalai;Jaedyn Tonee</Authors>
<PackageIconUrl>https://martendb.io/logo.png</PackageIconUrl>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using JasperFx.Core;
using JasperFx.Events;
using JasperFx.Events.Daemon;
using JasperFx.Events.Projections;
using Marten;
using Marten.Events.Projections;
using Marten.Storage;
using Marten.Subscriptions;
using Marten.Testing.Harness;
using Shouldly;
using Xunit;
using Xunit.Abstractions;

namespace DaemonTests.MultiTenancy;

public class using_for_tenant_with_side_effects_and_subscriptions : OneOffConfigurationsContext
{
private readonly ITestOutputHelper _output;

public using_for_tenant_with_side_effects_and_subscriptions(ITestOutputHelper output)
{
_output = output;
}

[Fact]
public async Task try_to_append_with_for_tenant_in_projection()
{
StoreOptions(opts =>
{
opts.Events.TenancyStyle = TenancyStyle.Conjoined;
opts.Projections.Add(new NumbersSubscription(), ProjectionLifecycle.Async);
opts.Projections.Errors.SkipApplyErrors = false;
opts.Logger(new TestOutputMartenLogger(_output));
});

using var session = theStore.LightweightSession("green");
session.Events.StartStream([new AEvent(), new BEvent(), new CEvent()]);
await session.SaveChangesAsync();

using var daemon = await theStore.BuildProjectionDaemonAsync();
await daemon.StartAllAsync();
await daemon.WaitForNonStaleData(5.Seconds());

var events = await theSession.Events.QueryAllRawEvents().Where(x => x.AnyTenant()).ToListAsync();
events.Count.ShouldBe(6);
}

[Fact]
public async Task try_to_append_with_for_tenant_in_subscription()
{
StoreOptions(opts =>
{
opts.Events.TenancyStyle = TenancyStyle.Conjoined;
opts.Events.Subscribe(new NumberBatchSubscription());
opts.Projections.Errors.SkipApplyErrors = false;
opts.Logger(new TestOutputMartenLogger(_output));
});

using var session = theStore.LightweightSession("green");
session.Events.StartStream([new AEvent(), new BEvent(), new CEvent()]);
await session.SaveChangesAsync();

using var daemon = await theStore.BuildProjectionDaemonAsync();
await daemon.StartAllAsync();
await daemon.WaitForNonStaleData(5.Seconds());

var events = await theSession.Events.QueryAllRawEvents().Where(x => x.AnyTenant()).ToListAsync();
events.Count.ShouldBe(6);
}
}


public class NumbersSubscription: EventProjection
{
public override ValueTask ApplyAsync(IDocumentOperations operations, IEvent e, CancellationToken cancellation)
{
if (e.TenantId != "blue")
{
operations.ForTenant("blue").Events.Append(e.StreamId, e.Data);

}

return ValueTask.CompletedTask;
}
}

public class NumberBatchSubscription: ISubscription
{
public Task<IChangeListener> ProcessEventsAsync(EventRange page, ISubscriptionController controller, IDocumentOperations operations,
CancellationToken cancellationToken)
{
foreach (var @event in page.Events)
{
if (@event.TenantId != "blue")
{
operations.ForTenant("blue").Events.Append(@event.StreamId, @event.Data);

}
}

return Task.FromResult(NullChangeListener.Instance);
}
}
4 changes: 4 additions & 0 deletions src/Marten/Events/AsyncProjectionTestingExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,10 @@ public static async Task WaitForNonStaleProjectionDataAsync(this IMartenDatabase
private static string writeStatusMessage(IReadOnlyList<ShardState> projections)
{

if (!projections.Any())
return
"There is no recorded projection, subscription, or even high water mark activity detected. Is the daemon started correctly?";

var grid = new Grid<ShardState>();
grid.AddColumn("Shard Name", x => x.ShardName);
grid.AddColumn("Sequence", x => x.Sequence.ToString(), true);
Expand Down
26 changes: 22 additions & 4 deletions src/Marten/Events/Daemon/Internals/ProjectionUpdateBatch.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class ProjectionUpdateBatch: IUpdateBatch, IAsyncDisposable, IDisposable,

private IMessageBatch? _batch;
private OperationPage? _current;
private DocumentSessionBase? _session;
private DocumentSessionBase _session;

internal ProjectionUpdateBatch(ProjectionOptions settings,
DocumentSessionBase? session, ShardExecutionMode mode, CancellationToken token)
Expand Down Expand Up @@ -179,7 +179,9 @@ void ISessionWorkTracker.Sort(StoreOptions options)
throw new NotSupportedException();
}

List<StreamAction> ISessionWorkTracker.Streams => new();
private readonly List<StreamAction> _streams = new();

List<StreamAction> ISessionWorkTracker.Streams => _streams;


IReadOnlyList<IStorageOperation> ISessionWorkTracker.AllOperations => throw new NotSupportedException();
Expand All @@ -196,12 +198,18 @@ void ISessionWorkTracker.EjectAllOfType(Type type)

bool ISessionWorkTracker.TryFindStream(string streamKey, out StreamAction stream)
{
throw new NotSupportedException();
stream = _streams
.FirstOrDefault(x => x.Key == streamKey);

return stream != null;
}

bool ISessionWorkTracker.TryFindStream(Guid streamId, out StreamAction stream)
{
throw new NotSupportedException();
stream = _streams
.FirstOrDefault(x => x.Id == streamId);

return stream != null;
}

bool ISessionWorkTracker.HasOutstandingWork()
Expand Down Expand Up @@ -282,6 +290,16 @@ public async Task WaitForCompletion()
await Queue.WaitForCompletionAsync().ConfigureAwait(false);

foreach (var patch in _patches) applyOperation(patch);

if (_streams.Any())
{
var eventStorage = _session.EventStorage();
foreach (var stream in _streams)
{
var op = eventStorage.QuickAppendEvents(stream);
applyOperation(op);
}
}
}

private void startNewPage(IMartenSession session)
Expand Down
9 changes: 9 additions & 0 deletions src/Marten/IDocumentOperations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,15 @@ public interface IDocumentOperations: IQuerySession, IStorageOperations
/// <param name="entity"></param>
void Delete<T>(T entity) where T : notnull;

/// <summary>
/// Access data from another tenant and apply document or event updates to this
/// IDocumentSession for a separate tenant
/// </summary>
/// <param name="tenantId"></param>
/// <returns></returns>
new ITenantOperations ForTenant(string tenantId);


/// <summary>
/// Mark an entity of type T with either a numeric or Guid id for deletion upon the next call to SaveChanges()
/// </summary>
Expand Down
Loading