diff --git a/Directory.Build.props b/Directory.Build.props index 4c9315a6f1..6d84c8f3d2 100644 --- a/Directory.Build.props +++ b/Directory.Build.props @@ -1,7 +1,7 @@ - 8.0.0-beta-2 + 8.0.0-rc-1 12.0 Jeremy D. Miller;Babu Annamalai;Oskar Dudycz;Joona-Pekka Kokko https://martendb.io/logo.png diff --git a/docs/events/configuration.md b/docs/events/configuration.md index 888b0a9cee..fe0e58d8af 100644 --- a/docs/events/configuration.md +++ b/docs/events/configuration.md @@ -62,19 +62,3 @@ var store = DocumentStore.For(opts => ``` snippet source | anchor - -By default, if you try to define projection with a single tenancy, Marten will throw an exception at runtime informing you about the mismatch. - -You can enable global projections for conjoined tenancy. - - - -```cs -opts.Events.EnableGlobalProjectionsForConjoinedTenancy = true; -``` -snippet source | anchor - - -::: warning -If you enable global projections for conjoined tenancy, Marten won't validate potential tenancy mismatch and won't throw an exception for that case. -::: diff --git a/src/CoreTests/bootstrapping_with_service_collection_extensions.cs b/src/CoreTests/bootstrapping_with_service_collection_extensions.cs index d0ca6926c2..e58ffc2224 100644 --- a/src/CoreTests/bootstrapping_with_service_collection_extensions.cs +++ b/src/CoreTests/bootstrapping_with_service_collection_extensions.cs @@ -21,6 +21,7 @@ using Npgsql; using Shouldly; using Weasel.Core.Migrations; +using Weasel.Core.MultiTenancy; using Xunit; namespace CoreTests; @@ -523,5 +524,8 @@ private static void ShouldHaveAllTheExpectedRegistrations(Container container, container.GetInstance().ShouldBeSameAs(store.As().Tenancy); container.Model.For>().Default.ShouldNotBeNull(); + + container.GetInstance() + .ShouldBeSameAs(store); } } diff --git a/src/CoreTests/describing_database_usage_from_DefaultTenancy.cs b/src/CoreTests/describing_database_usage_from_DefaultTenancy.cs index d3ec0f9254..8502d27241 100644 --- a/src/CoreTests/describing_database_usage_from_DefaultTenancy.cs +++ b/src/CoreTests/describing_database_usage_from_DefaultTenancy.cs @@ -1,6 +1,7 @@ using System.Linq; using System.Threading; using System.Threading.Tasks; +using JasperFx.Core.Reflection; using JasperFx.Descriptors; using Marten.Storage; using Marten.Testing.Harness; @@ -19,7 +20,7 @@ public describing_database_usage_from_DefaultTenancy(DefaultStoreFixture fixture public async Task create_usage() { theStore.Options.Tenancy.ShouldBeOfType(); - theStore.Options.Tenancy.Cardinality.ShouldBe(DatabaseCardinality.Single); + theStore.Options.Tenancy.As().Cardinality.ShouldBe(DatabaseCardinality.Single); var description = await theStore.Options.Tenancy.DescribeDatabasesAsync(CancellationToken.None); description.Cardinality.ShouldBe(DatabaseCardinality.Single); diff --git a/src/DaemonTests/DaemonTests.csproj b/src/DaemonTests/DaemonTests.csproj index b71daa6075..57a5e26816 100644 --- a/src/DaemonTests/DaemonTests.csproj +++ b/src/DaemonTests/DaemonTests.csproj @@ -5,7 +5,6 @@ - @@ -14,6 +13,7 @@ runtime; build; native; contentfiles; analyzers + diff --git a/src/DaemonTests/Internals/projection_progression_operations.cs b/src/DaemonTests/Internals/projection_progression_operations.cs index 09037eab7c..4e94ffb6c5 100644 --- a/src/DaemonTests/Internals/projection_progression_operations.cs +++ b/src/DaemonTests/Internals/projection_progression_operations.cs @@ -1,6 +1,7 @@ using System.Linq; using System.Threading.Tasks; using JasperFx.Events; +using JasperFx.Events.Daemon; using JasperFx.Events.Projections; using Marten.Events.Daemon.Progress; using Marten.Exceptions; diff --git a/src/EventSourcingTests/Aggregation/aggregation_projection_validation_rules.cs b/src/EventSourcingTests/Aggregation/aggregation_projection_validation_rules.cs index b41f7c939b..72d6013d28 100644 --- a/src/EventSourcingTests/Aggregation/aggregation_projection_validation_rules.cs +++ b/src/EventSourcingTests/Aggregation/aggregation_projection_validation_rules.cs @@ -4,6 +4,7 @@ using JasperFx.CodeGeneration; using JasperFx.Core.Reflection; using JasperFx.Events; +using JasperFx.Events.Grouping; using JasperFx.Events.Internals; using JasperFx.Events.Projections; using Marten; @@ -80,23 +81,6 @@ public void if_events_are_multi_tenanted_and_global_projections_are_disabled_so_ $"Tenancy storage style mismatch between the events (Conjoined) and the aggregate type {typeof(GuidIdentifiedAggregate).FullNameInCode()} (Single)"); } - [Fact] - public void if_events_are_multi_tenanted_and_global_projections_are_enabled() - { - shouldNotThrow(opts => - { - opts.Events.TenancyStyle = TenancyStyle.Conjoined; - - #region sample_enabling_global_projections_for_conjoined_tenancy - - opts.Events.EnableGlobalProjectionsForConjoinedTenancy = true; - - #endregion - - opts.Projections.Snapshot(SnapshotLifecycle.Async); - }); - } - [Fact] public void if_the_aggregate_is_multi_tenanted_but_the_events_are_not() { @@ -131,6 +115,15 @@ public void Apply(AEvent a) } } + public class GuidIdentifiedAggregateProjection: MultiStreamProjection + { + public GuidIdentifiedAggregateProjection() + { + TenancyGrouping = TenancyGrouping.AcrossTenants; + Identity(x => x.StreamId); + } + } + public class StringIdentifiedAggregate { public string Id { get; set; } diff --git a/src/EventSourcingTests/Aggregation/when_doing_live_aggregations.cs b/src/EventSourcingTests/Aggregation/when_doing_live_aggregations.cs index 89b6f04073..ae9c5a0b90 100644 --- a/src/EventSourcingTests/Aggregation/when_doing_live_aggregations.cs +++ b/src/EventSourcingTests/Aggregation/when_doing_live_aggregations.cs @@ -1,6 +1,7 @@ using System; using System.Threading; using System.Threading.Tasks; +using EventSourcingTests.FetchForWriting; using JasperFx.Events; using Marten; using Marten.Events; @@ -45,6 +46,43 @@ public async Task sync_apply_and_default_create() aggregate.DCount.ShouldBe(1); } + [Fact] + public async Task when_requesting_an_aggregate_for_an_invalid_version() + { + var streamId = Guid.NewGuid(); + + theSession.Events.StartStream(streamId, new AEvent(), new BEvent(), new CEvent(), + new DEvent()); + await theSession.SaveChangesAsync(); + + var aggregate1 = await theSession.Events.AggregateStreamAsync(streamId); + var aggregateAt4 = await theSession.Events.AggregateStreamAsync(streamId, version:4); + var aggregateAt5 = await theSession.Events.AggregateStreamAsync(streamId, version: 5); + + aggregateAt4.ShouldBe(aggregate1); + + aggregateAt5.ShouldBeNull(); + } + + [Fact] + public async Task when_requesting_an_aggregate_for_an_invalid_version_with_string_identifiers() + { + UseStreamIdentity(StreamIdentity.AsString); + var streamId = Guid.NewGuid().ToString(); + + theSession.Events.StartStream(streamId, new AEvent(), new BEvent(), new CEvent(), + new DEvent()); + await theSession.SaveChangesAsync(); + + var aggregate1 = await theSession.Events.AggregateStreamAsync(streamId); + var aggregateAt4 = await theSession.Events.AggregateStreamAsync(streamId, version:4); + var aggregateAt5 = await theSession.Events.AggregateStreamAsync(streamId, version: 5); + + aggregateAt4.ShouldBe(aggregate1); + + aggregateAt5.ShouldBeNull(); + } + [Fact] public async Task sync_apply_and_specific_create() { diff --git a/src/EventSourcingTests/Aggregation/when_finding_the_last_good_aggregation.cs b/src/EventSourcingTests/Aggregation/when_finding_the_last_good_aggregation.cs new file mode 100644 index 0000000000..b81e4f9759 --- /dev/null +++ b/src/EventSourcingTests/Aggregation/when_finding_the_last_good_aggregation.cs @@ -0,0 +1,239 @@ +using System; +using System.Threading.Tasks; +using JasperFx; +using JasperFx.Events; +using Marten.Testing.Harness; +using Shouldly; +using Xunit; + +namespace EventSourcingTests.Aggregation; + +public class when_finding_the_last_good_aggregation : IntegrationContext +{ + public when_finding_the_last_good_aggregation(DefaultStoreFixture fixture) : base(fixture) + { + } + + [Fact] + public async Task finding_last_aggregate_using_guid() + { + var streamId = Guid.NewGuid(); + + theSession.Events.StartStream(streamId, new AEvent(), new BEvent(), new CEvent(), + new DEvent(), new DeleteYourself()); + await theSession.SaveChangesAsync(); + + // Tight semantics here. + (await theSession.Events.AggregateStreamAsync(streamId)).ShouldBeNull(); + var aggregateAt4 = await theSession.Events.AggregateStreamAsync(streamId, 4); + aggregateAt4.ShouldNotBeNull(); + + var lastGood = await theSession.Events.AggregateStreamToLastKnownAsync(streamId); + lastGood.ShouldNotBeNull(); + + lastGood.ShouldBe(aggregateAt4); + + // Now, mess things up and start over! + theSession.Events.Append(streamId, new DEvent(), new DEvent()); + await theSession.SaveChangesAsync(); + + var newLastGood = + await theSession.Events.AggregateStreamToLastKnownAsync(streamId); + newLastGood.ShouldNotBeNull(); + newLastGood.DCount.ShouldBe(2); + newLastGood.ACount.ShouldBe(0); + newLastGood.BCount.ShouldBe(0); + newLastGood.CCount.ShouldBe(0); + } + + [Fact] + public async Task finding_last_aggregate_using_string() + { + UseStreamIdentity(StreamIdentity.AsString); + var streamKey = Guid.NewGuid().ToString(); + + theSession.Events.StartStream(streamKey, new AEvent(), new BEvent(), new CEvent(), + new DEvent(), new DeleteYourself()); + await theSession.SaveChangesAsync(); + + // Tight semantics here. + (await theSession.Events.AggregateStreamAsync(streamKey)).ShouldBeNull(); + var aggregateAt4 = await theSession.Events.AggregateStreamAsync(streamKey, 4); + aggregateAt4.ShouldNotBeNull(); + + var lastGood = await theSession.Events.AggregateStreamToLastKnownAsync(streamKey); + lastGood.ShouldNotBeNull(); + + lastGood.ShouldBe(aggregateAt4); + + // Now, mess things up and start over! + theSession.Events.Append(streamKey, new DEvent(), new DEvent()); + await theSession.SaveChangesAsync(); + + var newLastGood = + await theSession.Events.AggregateStreamToLastKnownAsync(streamKey); + newLastGood.ShouldNotBeNull(); + newLastGood.DCount.ShouldBe(2); + newLastGood.ACount.ShouldBe(0); + newLastGood.BCount.ShouldBe(0); + newLastGood.CCount.ShouldBe(0); + } +} + +public record DeleteYourself; + +public class SimpleMaybeDeletedAggregate : IRevisioned +{ + // This will be the aggregate version + public int Version { get; set; } + + public bool ShouldDelete(DeleteYourself _) => true; + + public Guid Id { get; + set; } + + public int ACount { get; set; } + public int BCount { get; set; } + public int CCount { get; set; } + public int DCount { get; set; } + public int ECount { get; set; } + + public void Apply(AEvent _) + { + ACount++; + } + + public void Apply(BEvent _) + { + BCount++; + } + + public void Apply(CEvent _) + { + CCount++; + } + + public void Apply(DEvent _) + { + DCount++; + } + + public void Apply(EEvent _) + { + ECount++; + } + + protected bool Equals(SimpleMaybeDeletedAggregate other) + { + return Version == other.Version && Id.Equals(other.Id) && ACount == other.ACount && BCount == other.BCount && CCount == other.CCount && DCount == other.DCount && ECount == other.ECount; + } + + public override bool Equals(object obj) + { + if (obj is null) + { + return false; + } + + if (ReferenceEquals(this, obj)) + { + return true; + } + + if (obj.GetType() != GetType()) + { + return false; + } + + return Equals((SimpleMaybeDeletedAggregate)obj); + } + + public override int GetHashCode() + { + return HashCode.Combine(Version, Id, ACount, BCount, CCount, DCount, ECount); + } + + public override string ToString() + { + return + $"{nameof(Version)}: {Version}, {nameof(Id)}: {Id}, {nameof(ACount)}: {ACount}, {nameof(BCount)}: {BCount}, {nameof(CCount)}: {CCount}, {nameof(DCount)}: {DCount}, {nameof(ECount)}: {ECount}"; + } +} + +public class SimpleAsStringMaybeDeletedAggregate : IRevisioned +{ + protected bool Equals(SimpleAsStringMaybeDeletedAggregate other) + { + return Version == other.Version && Id == other.Id && ACount == other.ACount && BCount == other.BCount && CCount == other.CCount && DCount == other.DCount && ECount == other.ECount; + } + + public override bool Equals(object obj) + { + if (obj is null) + { + return false; + } + + if (ReferenceEquals(this, obj)) + { + return true; + } + + if (obj.GetType() != GetType()) + { + return false; + } + + return Equals((SimpleAsStringMaybeDeletedAggregate)obj); + } + + public override int GetHashCode() + { + return HashCode.Combine(Version, Id, ACount, BCount, CCount, DCount, ECount); + } + + // This will be the aggregate version + public int Version { get; set; } + + public bool ShouldDelete(DeleteYourself _) => true; + + public string Id { get; + set; } + + public int ACount { get; set; } + public int BCount { get; set; } + public int CCount { get; set; } + public int DCount { get; set; } + public int ECount { get; set; } + + public void Apply(AEvent _) + { + ACount++; + } + + public void Apply(BEvent _) + { + BCount++; + } + + public void Apply(CEvent _) + { + CCount++; + } + + public void Apply(DEvent _) + { + DCount++; + } + + public void Apply(EEvent _) + { + ECount++; + } + + public override string ToString() + { + return + $"{nameof(Version)}: {Version}, {nameof(Id)}: {Id}, {nameof(ACount)}: {ACount}, {nameof(BCount)}: {BCount}, {nameof(CCount)}: {CCount}, {nameof(DCount)}: {DCount}, {nameof(ECount)}: {ECount}"; + } +} diff --git a/src/EventSourcingTests/Bugs/document_and_event_operations_within_page.cs b/src/EventSourcingTests/Bugs/document_and_event_operations_within_page.cs deleted file mode 100644 index 9bcc979193..0000000000 --- a/src/EventSourcingTests/Bugs/document_and_event_operations_within_page.cs +++ /dev/null @@ -1,145 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Threading.Tasks; -using EventSourcingTests.Projections; -using JasperFx.Events; -using JasperFx.Events.Grouping; -using JasperFx.Events.Projections; -using Marten; -using Marten.Events; -using Marten.Events.Aggregation; -using Marten.Events.Projections; -using Marten.Storage; -using Marten.Testing.Harness; -using Xunit; -using Xunit.Abstractions; - -namespace EventSourcingTests.Bugs; - -public class document_and_event_operations_within_page : BugIntegrationContext -{ - private readonly ITestOutputHelper _output; - - public document_and_event_operations_within_page(ITestOutputHelper output) - { - _output = output; - } - - [Fact] - public async Task failure_due_to_ordering_change() - { - theStore.Options.Events.MetadataConfig.HeadersEnabled = true; - theStore.Options.Events.TenancyStyle = TenancyStyle.Conjoined; - theStore.Options.Events.StreamIdentity = StreamIdentity.AsGuid; - - theStore.Options.Projections.Add(ProjectionLifecycle.Inline); - theStore.Options.Projections.Add(ProjectionLifecycle.Inline); - theStore.Options.Projections.Add(ProjectionLifecycle.Inline); - - - using var session = theStore.LightweightSession("tenant"); - session.Logger = new TestOutputMartenLogger(_output); - - var mrCreated = new SamplesRolledUpCreated(Guid.NewGuid()); - session.Events.StartStream(mrCreated.Id, mrCreated); - await session.SaveChangesAsync(); - - for (var count = 1; count <= 2; count++) - { - var sampleAdded = new SampleAdded(mrCreated.Id, 23); - session.Events.Append(sampleAdded.Id, sampleAdded); - await session.SaveChangesAsync(); - } - - session.Events.Append(mrCreated.Id, new SamplesRolledUpPublished(mrCreated.Id)); - await session.SaveChangesAsync(); - } -} - - -public record SamplesRolledUp(Guid Id, List Samples, bool Published); - - -public record SamplesRolledUpCreated(Guid Id); - -public record SamplesRolledUpPublished(Guid Id); - -public class SamplesRolledUpProjection: SingleStreamProjection -{ - public SamplesRolledUpProjection() - { - CreateEvent(x => new SamplesRolledUp(x.Id, new List(), false)); - ProjectEvent((x, y) => - { - var existing = x.Samples; - existing.Add(y.Id); - return x with { Samples = existing }; - }); - ProjectEvent(x => x with { Published = true }); - } -} - - -public record SampleView(Guid Id, int Value, bool Published); - -public record SampleAdded(Guid Id, int Value); - -public class SampleProjection : MultiStreamProjection -{ - public SampleProjection() - { - CustomGrouping(new SampleGrouper()); - Identity(x => x.Id); - - CreateEvent(x=> new SampleView(x.Id, x.Value, false)); - - ProjectEvent(x=> x with { Published = true }); - - } -} - -public sealed class SampleGrouper: IAggregateGrouper -{ - public async Task Group(IQuerySession session, IEnumerable events, IEventGrouping grouping) - { - var publishedEvents = events.OfType>().ToArray(); - - foreach (var published in publishedEvents) - { - var sample = - await session.Events.AggregateStreamAsync(published.Data.Id, published.Version); - - foreach (var sampleEvent in sample!.Samples) - { - grouping.AddEvents(sampleEvent, publishedEvents); - } - } - } -} - -public record SampleEventView(Guid Id); - -public class SampleEventProjection: EventProjection -{ - public SampleEventProjection() - { - Project(((added, operations) => - { - operations.Store(new SampleEventView(added.Id)); - })); - - ProjectAsync(((async (published,operations, cancellation) => - { - var rolledUp = await operations.Events.AggregateStreamAsync(published.Id, token:cancellation); - foreach (var rolledUpSample in rolledUp.Samples) - { - operations.Store(new SampleEventView(rolledUpSample)); - } - }))); - } -} - - - - diff --git a/src/EventSourcingTests/FetchForWriting/fetching_live_aggregates_for_writing.cs b/src/EventSourcingTests/FetchForWriting/fetching_live_aggregates_for_writing.cs index 6088233356..b2c6e3b350 100644 --- a/src/EventSourcingTests/FetchForWriting/fetching_live_aggregates_for_writing.cs +++ b/src/EventSourcingTests/FetchForWriting/fetching_live_aggregates_for_writing.cs @@ -553,6 +553,35 @@ public void Apply(EEvent _) ECount++; } + protected bool Equals(SimpleAggregate other) + { + return Version == other.Version && Id.Equals(other.Id) && ACount == other.ACount && BCount == other.BCount && CCount == other.CCount && DCount == other.DCount && ECount == other.ECount; + } + + public override bool Equals(object obj) + { + if (obj is null) + { + return false; + } + + if (ReferenceEquals(this, obj)) + { + return true; + } + + if (obj.GetType() != GetType()) + { + return false; + } + + return Equals((SimpleAggregate)obj); + } + + public override int GetHashCode() + { + return HashCode.Combine(Version, Id, ACount, BCount, CCount, DCount, ECount); + } } @@ -635,6 +664,36 @@ public void Apply(EEvent _) { ECount++; } + + protected bool Equals(SimpleAggregateAsString other) + { + return Version == other.Version && Id == other.Id && ACount == other.ACount && BCount == other.BCount && CCount == other.CCount && DCount == other.DCount && ECount == other.ECount; + } + + public override bool Equals(object obj) + { + if (obj is null) + { + return false; + } + + if (ReferenceEquals(this, obj)) + { + return true; + } + + if (obj.GetType() != GetType()) + { + return false; + } + + return Equals((SimpleAggregateAsString)obj); + } + + public override int GetHashCode() + { + return HashCode.Combine(Version, Id, ACount, BCount, CCount, DCount, ECount); + } } public class Totals diff --git a/src/EventSourcingTests/Projections/MultiStreamProjections/rolling_up_by_tenant.cs b/src/EventSourcingTests/Projections/MultiStreamProjections/rolling_up_by_tenant.cs index ec89dcf693..199bd81027 100644 --- a/src/EventSourcingTests/Projections/MultiStreamProjections/rolling_up_by_tenant.cs +++ b/src/EventSourcingTests/Projections/MultiStreamProjections/rolling_up_by_tenant.cs @@ -21,7 +21,6 @@ public async Task track_totals_by_tenant_id() { opts.Events.TenancyStyle = TenancyStyle.Conjoined; opts.Projections.Add(ProjectionLifecycle.Async); - opts.Events.EnableGlobalProjectionsForConjoinedTenancy = true; }); var session1 = theStore.LightweightSession("one"); @@ -59,7 +58,6 @@ public async Task track_totals_by_tenant_id_using_strong_typed_id() { opts.Events.TenancyStyle = TenancyStyle.Conjoined; opts.Projections.Add(ProjectionLifecycle.Async); - opts.Events.EnableGlobalProjectionsForConjoinedTenancy = true; }); var session1 = theStore.LightweightSession("one"); diff --git a/src/EventSourcingTests/Projections/MultiTenants/ConjoinedTenancyProjectionsTests.cs b/src/EventSourcingTests/Projections/MultiTenants/ConjoinedTenancyProjectionsTests.cs index e9ab9cbc28..317af5a130 100644 --- a/src/EventSourcingTests/Projections/MultiTenants/ConjoinedTenancyProjectionsTests.cs +++ b/src/EventSourcingTests/Projections/MultiTenants/ConjoinedTenancyProjectionsTests.cs @@ -8,6 +8,7 @@ using JasperFx.Events; using JasperFx.Events.Aggregation; using JasperFx.Events.Daemon; +using JasperFx.Events.Grouping; using JasperFx.Events.Projections; using Marten; using Marten.Events; @@ -34,7 +35,6 @@ public async Task ForEventsAppendedToTenantedSession_AndConjoinedTenancyProjecti { opts.Policies.AllDocumentsAreMultiTenanted(); opts.Events.TenancyStyle = TenancyStyle.Conjoined; - opts.Events.EnableGlobalProjectionsForConjoinedTenancy = true; opts.Schema.For().SingleTenanted(); @@ -109,7 +109,6 @@ public async Task ForEventsAppendedToTenantedSession_CustomProjection() { opts.Policies.AllDocumentsAreMultiTenanted(); opts.Events.TenancyStyle = TenancyStyle.Conjoined; - opts.Events.EnableGlobalProjectionsForConjoinedTenancy = true; opts.Schema.For().SoftDeleted(); opts.Projections.Add(new CompanyLocationCustomProjection(), ProjectionLifecycle.Inline); @@ -191,6 +190,7 @@ public class ResourcesGlobalSummaryProjection: MultiStreamProjection(e => e.OrganisationId); Identity(e => e.OrganisationId); } diff --git a/src/EventSourcingTests/Projections/custom_transformation_of_events.cs b/src/EventSourcingTests/Projections/custom_transformation_of_events.cs index 2c5d801974..6530477232 100644 --- a/src/EventSourcingTests/Projections/custom_transformation_of_events.cs +++ b/src/EventSourcingTests/Projections/custom_transformation_of_events.cs @@ -3,6 +3,7 @@ using System.Threading.Tasks; using JasperFx; using JasperFx.Events; +using JasperFx.Events.Grouping; using JasperFx.Events.Projections; using Marten.Events; using Marten.Events.Projections; @@ -35,12 +36,12 @@ public class project_events_from_multiple_streams_into_view: OneOffConfiguration [Fact] public async Task updateonly_event_for_custom_view_projection_should_not_create_new_document() { - StoreOptions(_ => + StoreOptions(opts => { - _.AutoCreateSchemaObjects = AutoCreate.All; - _.Events.TenancyStyle = Marten.Storage.TenancyStyle.Conjoined; - _.Schema.For().MultiTenanted(); - _.Projections.Add(new NewsletterSubscriptionProjection(), ProjectionLifecycle.Inline); + opts.AutoCreateSchemaObjects = AutoCreate.All; + opts.Events.TenancyStyle = Marten.Storage.TenancyStyle.Conjoined; + opts.Schema.For().MultiTenanted(); + opts.Projections.Add(new NewsletterSubscriptionProjection(), ProjectionLifecycle.Inline); }); @@ -194,6 +195,8 @@ public NewsletterSubscriptionProjection() Identity(x => x.SubscriptionId); DeleteEvent(); + + TenancyGrouping = TenancyGrouping.AcrossTenants; } public void Apply(NewsletterSubscription view, ReaderSubscribed @event) diff --git a/src/EventSourcingTests/QuickAppend/quick_append_event_capture_and_fetching_the_stream.cs b/src/EventSourcingTests/QuickAppend/quick_append_event_capture_and_fetching_the_stream.cs index ba4a787fbe..1884f76974 100644 --- a/src/EventSourcingTests/QuickAppend/quick_append_event_capture_and_fetching_the_stream.cs +++ b/src/EventSourcingTests/QuickAppend/quick_append_event_capture_and_fetching_the_stream.cs @@ -194,90 +194,6 @@ await When.CalledForEachAsync(tenants, async (tenantId, index) => ); } - [Theory] - [MemberData(nameof(SessionParams))] - public async Task open_persisted_stream_in_new_store_with_same_settings(TenancyStyle tenancyStyle, string[] tenants) - { - var store = ConfigureStore(tenancyStyle); - var questId = Guid.NewGuid(); - - await When.CalledForEachAsync(tenants, async (tenantId, index) => - { - using (var session = store.LightweightSession(tenantId)) - { - //Note "Id = questId" @see live_aggregate_equals_inlined_aggregate... - var started = new QuestStarted { Id = questId, Name = "Destroy the One Ring" }; - var joined1 = new MembersJoined(1, "Hobbiton", "Frodo", "Merry"); - - session.Events.StartStream(questId, started, joined1); - await session.SaveChangesAsync(); - } - - // events-aggregate-on-the-fly - works with same store - using (var session = store.LightweightSession(tenantId)) - { - // questId is the id of the stream - var party = await session.Events.AggregateStreamAsync(questId); - - party.Id.ShouldBe(questId); - party.ShouldNotBeNull(); - - var party_at_version_3 = await session.Events - .AggregateStreamAsync(questId, 3); - - party_at_version_3.ShouldNotBeNull(); - - var party_yesterday = await session.Events - .AggregateStreamAsync(questId, timestamp: DateTimeOffset.UtcNow.AddDays(-1)); - party_yesterday.ShouldBeNull(); - } - - using (var session = store.LightweightSession(tenantId)) - { - var party = await session.LoadAsync(questId); - party.Id.ShouldBe(questId); - } - - var newStore = ConfigureStore(tenancyStyle, false); - - //Inline is working - using (var session = store.LightweightSession(tenantId)) - { - var party = await session.LoadAsync(questId); - party.ShouldNotBeNull(); - } - - //GetAll - using (var session = store.LightweightSession(tenantId)) - { - var parties = session.Events.QueryRawEventDataOnly().ToArray(); - foreach (var party in parties) - { - party.ShouldNotBeNull(); - } - } - - //This AggregateStream fail with NPE - using (var session = store.LightweightSession(tenantId)) - { - // questId is the id of the stream - var party = await session.Events.AggregateStreamAsync(questId); //Here we get NPE - party.Id.ShouldBe(questId); - - var party_at_version_3 = await session.Events - .AggregateStreamAsync(questId, 3); - party_at_version_3.Id.ShouldBe(questId); - - var party_yesterday = await session.Events - .AggregateStreamAsync(questId, timestamp: DateTimeOffset.UtcNow.AddDays(-1)); - party_yesterday.ShouldBeNull(); - } - }).ShouldThrowIfAsync( - (tenancyStyle == TenancyStyle.Single && tenants.Length > 1) || - (tenancyStyle == TenancyStyle.Conjoined && tenants.SequenceEqual(SameTenants)) - ); - } - [Theory] [MemberData(nameof(SessionParams))] public async Task query_before_saving(TenancyStyle tenancyStyle, string[] tenants) diff --git a/src/EventSourcingTests/QuickAppend/quick_append_event_capture_and_fetching_the_stream_with_string_identifiers.cs b/src/EventSourcingTests/QuickAppend/quick_append_event_capture_and_fetching_the_stream_with_string_identifiers.cs index bca859992b..50a8c8b4a9 100644 --- a/src/EventSourcingTests/QuickAppend/quick_append_event_capture_and_fetching_the_stream_with_string_identifiers.cs +++ b/src/EventSourcingTests/QuickAppend/quick_append_event_capture_and_fetching_the_stream_with_string_identifiers.cs @@ -151,6 +151,9 @@ public async Task open_persisted_stream_in_new_store_with_same_settings() { var questId = "Sixth"; + await theStore.Advanced.Clean.DeleteAllEventDataAsync(); + await theStore.Advanced.Clean.DeleteDocumentsByTypeAsync(typeof(QuestPartyWithStringIdentifier)); + using (var session = theStore.LightweightSession()) { //Note "Id = questId" @see live_aggregate_equals_inlined_aggregate... @@ -172,7 +175,7 @@ public async Task open_persisted_stream_in_new_store_with_same_settings() var party_at_version_3 = await session.Events .AggregateStreamAsync(questId, 3); - party_at_version_3.ShouldNotBeNull(); + party_at_version_3.ShouldBeNull(); var party_yesterday = await session.Events .AggregateStreamAsync(questId, timestamp: DateTime.UtcNow.AddDays(-1)); @@ -211,10 +214,6 @@ public async Task open_persisted_stream_in_new_store_with_same_settings() var party = await session.Events.AggregateStreamAsync(questId); //Here we get NPE party.ShouldNotBeNull(); - var party_at_version_3 = await session.Events - .AggregateStreamAsync(questId, 3); - party_at_version_3.ShouldNotBeNull(); - var party_yesterday = await session.Events .AggregateStreamAsync(questId, timestamp: DateTime.UtcNow.AddDays(-1)); party_yesterday.ShouldBeNull(); diff --git a/src/EventSourcingTests/end_to_end_event_capture_and_fetching_the_stream.cs b/src/EventSourcingTests/end_to_end_event_capture_and_fetching_the_stream.cs index 208a2b09bb..4621732499 100644 --- a/src/EventSourcingTests/end_to_end_event_capture_and_fetching_the_stream.cs +++ b/src/EventSourcingTests/end_to_end_event_capture_and_fetching_the_stream.cs @@ -209,90 +209,6 @@ await When.CalledForEachAsync(tenants, async (tenantId, index) => ); } - [Theory] - [MemberData(nameof(SessionParams))] - public async Task open_persisted_stream_in_new_store_with_same_settings(TenancyStyle tenancyStyle, string[] tenants) - { - var store = InitStore(tenancyStyle); - var questId = Guid.NewGuid(); - - await When.CalledForEachAsync(tenants, async (tenantId, index) => - { - using (var session = store.LightweightSession(tenantId)) - { - //Note "Id = questId" @see live_aggregate_equals_inlined_aggregate... - var started = new QuestStarted { Id = questId, Name = "Destroy the One Ring" }; - var joined1 = new MembersJoined(1, "Hobbiton", "Frodo", "Merry"); - - session.Events.StartStream(questId, started, joined1); - await session.SaveChangesAsync(); - } - - // events-aggregate-on-the-fly - works with same store - using (var session = store.LightweightSession(tenantId)) - { - // questId is the id of the stream - var party = await session.Events.AggregateStreamAsync(questId); - - party.Id.ShouldBe(questId); - party.ShouldNotBeNull(); - - var party_at_version_3 = await session.Events - .AggregateStreamAsync(questId, 3); - - party_at_version_3.ShouldNotBeNull(); - - var party_yesterday = await session.Events - .AggregateStreamAsync(questId, timestamp: DateTimeOffset.UtcNow.AddDays(-1)); - party_yesterday.ShouldBeNull(); - } - - using (var session = store.LightweightSession(tenantId)) - { - var party = await session.LoadAsync(questId); - party.Id.ShouldBe(questId); - } - - var newStore = InitStore(tenancyStyle, false); - - //Inline is working - using (var session = store.LightweightSession(tenantId)) - { - var party = await session.LoadAsync(questId); - party.ShouldNotBeNull(); - } - - //GetAll - using (var session = store.LightweightSession(tenantId)) - { - var parties = session.Events.QueryRawEventDataOnly().ToArray(); - foreach (var party in parties) - { - party.ShouldNotBeNull(); - } - } - - //This AggregateStream fail with NPE - using (var session = store.LightweightSession(tenantId)) - { - // questId is the id of the stream - var party = await session.Events.AggregateStreamAsync(questId); //Here we get NPE - party.Id.ShouldBe(questId); - - var party_at_version_3 = await session.Events - .AggregateStreamAsync(questId, 3); - party_at_version_3.Id.ShouldBe(questId); - - var party_yesterday = await session.Events - .AggregateStreamAsync(questId, timestamp: DateTimeOffset.UtcNow.AddDays(-1)); - party_yesterday.ShouldBeNull(); - } - }).ShouldThrowIfAsync( - (tenancyStyle == TenancyStyle.Single && tenants.Length > 1) || - (tenancyStyle == TenancyStyle.Conjoined && tenants.SequenceEqual(SameTenants)) - ); - } - [Theory] [MemberData(nameof(SessionParams))] public async Task query_before_saving(TenancyStyle tenancyStyle, string[] tenants) @@ -647,22 +563,20 @@ await When.CalledForEachAsync(tenants, async (tenantId, index) => private DocumentStore InitStore(TenancyStyle tenancyStyle, bool cleanSchema = true, bool useAppendEventForUpdateLock = false) { - var store = StoreOptions(_ => + var store = StoreOptions(opts => { - _.Events.TenancyStyle = tenancyStyle; - - _.AutoCreateSchemaObjects = AutoCreate.All; + opts.Events.TenancyStyle = tenancyStyle; if (tenancyStyle == TenancyStyle.Conjoined) - _.Policies.AllDocumentsAreMultiTenanted(); + opts.Policies.AllDocumentsAreMultiTenanted(); - _.Connection(ConnectionSource.ConnectionString); + opts.Connection(ConnectionSource.ConnectionString); - _.Projections.Snapshot(SnapshotLifecycle.Inline); + opts.Projections.Snapshot(SnapshotLifecycle.Inline); - _.Events.AddEventType(typeof(MembersJoined)); - _.Events.AddEventType(typeof(MembersDeparted)); - _.Events.AddEventType(typeof(QuestStarted)); + opts.Events.AddEventType(typeof(MembersJoined)); + opts.Events.AddEventType(typeof(MembersDeparted)); + opts.Events.AddEventType(typeof(QuestStarted)); }, cleanSchema); diff --git a/src/EventSourcingTests/end_to_end_event_capture_and_fetching_the_stream_with_string_identifiers.cs b/src/EventSourcingTests/end_to_end_event_capture_and_fetching_the_stream_with_string_identifiers.cs index 490b0b34b6..7746580e9c 100644 --- a/src/EventSourcingTests/end_to_end_event_capture_and_fetching_the_stream_with_string_identifiers.cs +++ b/src/EventSourcingTests/end_to_end_event_capture_and_fetching_the_stream_with_string_identifiers.cs @@ -150,81 +150,6 @@ public async Task live_aggregate_equals_inlined_aggregate_without_hidden_contrac } } - [Fact] - public async Task open_persisted_stream_in_new_store_with_same_settings() - { - var questId = "Sixth"; - - using (var session = theStore.LightweightSession()) - { - //Note "Id = questId" @see live_aggregate_equals_inlined_aggregate... - var started = new QuestStarted { Name = "Destroy the One Ring" }; - var joined1 = new MembersJoined(1, "Hobbiton", "Frodo", "Merry"); - - session.Events.StartStream(questId, started, joined1); - await session.SaveChangesAsync(); - } - - // events-aggregate-on-the-fly - works with same store - using (var session = theStore.LightweightSession()) - { - // questId is the id of the stream - var party = await session.Events.AggregateStreamAsync(questId); - - party.ShouldNotBeNull(); - - var party_at_version_3 = await session.Events - .AggregateStreamAsync(questId, 3); - - party_at_version_3.ShouldNotBeNull(); - - var party_yesterday = await session.Events - .AggregateStreamAsync(questId, timestamp: DateTime.UtcNow.AddDays(-1)); - party_yesterday.ShouldBeNull(); - } - - using (var session = theStore.LightweightSession()) - { - var party = await session.LoadAsync(questId); - party.ShouldNotBeNull(); - } - - var newStore = new DocumentStore(theStore.Options); - - //Inline is working - using (var session = newStore.LightweightSession()) - { - var party = await session.LoadAsync(questId); - party.ShouldNotBeNull(); - } - - //GetAll - using (var session = theStore.LightweightSession()) - { - var parties = session.Events.QueryRawEventDataOnly().ToArray(); - foreach (var party in parties) - { - party.ShouldNotBeNull(); - } - } - - //This AggregateStream fail with NPE - using (var session = newStore.LightweightSession()) - { - // questId is the id of the stream - var party = await session.Events.AggregateStreamAsync(questId); //Here we get NPE - party.ShouldNotBeNull(); - - var party_at_version_3 = await session.Events - .AggregateStreamAsync(questId, 3); - party_at_version_3.ShouldNotBeNull(); - - var party_yesterday = await session.Events - .AggregateStreamAsync(questId, timestamp: DateTime.UtcNow.AddDays(-1)); - party_yesterday.ShouldBeNull(); - } - } - [Fact] public async Task query_before_saving() { diff --git a/src/LinqTestsTypes/LinqTestsTypes.csproj b/src/LinqTestsTypes/LinqTestsTypes.csproj index 1e283636c0..a3958952df 100644 --- a/src/LinqTestsTypes/LinqTestsTypes.csproj +++ b/src/LinqTestsTypes/LinqTestsTypes.csproj @@ -14,6 +14,6 @@ - + diff --git a/src/Marten/DocumentStore.EventStore.cs b/src/Marten/DocumentStore.EventStore.cs index d9a075e5ed..84ac7464c6 100644 --- a/src/Marten/DocumentStore.EventStore.cs +++ b/src/Marten/DocumentStore.EventStore.cs @@ -7,6 +7,7 @@ using System.Threading.Tasks; using JasperFx; using JasperFx.Core; +using JasperFx.Core.Reflection; using JasperFx.Descriptors; using JasperFx.Events; using JasperFx.Events.Aggregation; @@ -41,13 +42,13 @@ static DocumentStore() ProjectionExceptions.RegisterTransientExceptionType(); } - DatabaseCardinality IEventStore.DatabaseCardinality => Options.Tenancy.Cardinality; + DatabaseCardinality IEventStore.DatabaseCardinality => Options.Tenancy.As().Cardinality; bool IEventStore.HasMultipleTenants { get { - if (Options.Tenancy.Cardinality != DatabaseCardinality.Single) return true; + if (Options.Tenancy.As().Cardinality != DatabaseCardinality.Single) return true; if (Options.Events.TenancyStyle == TenancyStyle.Conjoined) return true; diff --git a/src/Marten/DocumentStore.MultiTenancy.cs b/src/Marten/DocumentStore.MultiTenancy.cs new file mode 100644 index 0000000000..c7914a35fa --- /dev/null +++ b/src/Marten/DocumentStore.MultiTenancy.cs @@ -0,0 +1,32 @@ +using System.Threading.Tasks; +using Marten.Storage; +using Weasel.Core.MultiTenancy; + +namespace Marten; + +public partial class DocumentStore : IMasterTableMultiTenancy +{ + async Task IMasterTableMultiTenancy.TryAddTenantDatabaseRecordsAsync(string tenantId, string connectionString) + { + var tenancy = Options.Tenancy as MasterTableTenancy; + if (tenancy is null) + { + return false; + } + + await tenancy.AddDatabaseRecordAsync(tenantId, connectionString).ConfigureAwait(false); + return true; + } + + async Task IMasterTableMultiTenancy.ClearAllDatabaseRecordsAsync() + { + var tenancy = Options.Tenancy as MasterTableTenancy; + if (tenancy is null) + { + return false; + } + + await tenancy.ClearAllDatabaseRecordsAsync().ConfigureAwait(false); + return true; + } +} diff --git a/src/Marten/Events/Aggregation/SingleStreamProjection.cs b/src/Marten/Events/Aggregation/SingleStreamProjection.cs index 274f60241e..c1d3778e0b 100644 --- a/src/Marten/Events/Aggregation/SingleStreamProjection.cs +++ b/src/Marten/Events/Aggregation/SingleStreamProjection.cs @@ -46,12 +46,9 @@ public IEnumerable ValidateConfiguration(StoreOptions options) foreach (var p in validateDocumentIdentity(options, mapping)) yield return p; - if (options.Events.TenancyStyle != mapping.TenancyStyle - && (options.Events.TenancyStyle == TenancyStyle.Single - || (options.Events is - { TenancyStyle: TenancyStyle.Conjoined, EnableGlobalProjectionsForConjoinedTenancy: false } - && Lifecycle != ProjectionLifecycle.Live)) - ) + if (options.Events.TenancyStyle != mapping.TenancyStyle) + + if (Lifecycle != ProjectionLifecycle.Live && options.Events.TenancyStyle != mapping.TenancyStyle) { yield return $"Tenancy storage style mismatch between the events ({options.Events.TenancyStyle}) and the aggregate type {typeof(TDoc).FullNameInCode()} ({mapping.TenancyStyle})"; diff --git a/src/Marten/Events/Daemon/Coordination/ProjectionCoordinator.cs b/src/Marten/Events/Daemon/Coordination/ProjectionCoordinator.cs index 5f9daa46c0..62874d7d9c 100644 --- a/src/Marten/Events/Daemon/Coordination/ProjectionCoordinator.cs +++ b/src/Marten/Events/Daemon/Coordination/ProjectionCoordinator.cs @@ -189,6 +189,13 @@ private async Task executeAsync(CancellationToken stoppingToken) { var daemon = resolveDaemon(set); + // If any agents are known to be stopped, we need to just shut + // down everything and release the lock, then let some other node pick that up + if (anyAgentsAreStoppped(set, daemon)) + { + await stopAndReleaseProjectionSet(set, daemon).ConfigureAwait(false); + } + // check if it's still running await startAgentsIfNecessaryAsync(set, daemon, stoppingToken).ConfigureAwait(false); } @@ -240,6 +247,31 @@ await Task.Delay(_options.Projections.LeadershipPollingTime.Milliseconds(), stop } } + private async Task stopAndReleaseProjectionSet(IProjectionSet set, IProjectionDaemon daemon) + { + // No, shut them all down!!!! + foreach (var shardName in set.Names) + { + await daemon.StopAgentAsync(shardName.Identity).ConfigureAwait(false); + } + + await Distributor.ReleaseLockAsync(set).ConfigureAwait(false); + } + + private bool anyAgentsAreStoppped(IProjectionSet set, IProjectionDaemon daemon) + { + foreach (var name in set.Names) + { + var status = daemon.StatusFor(name.Identity); + if (status == AgentStatus.Stopped) + { + return true; + } + } + + return false; + } + private async Task startAgentsIfNecessaryAsync(IProjectionSet set, IProjectionDaemon daemon, CancellationToken stoppingToken) { @@ -250,7 +282,7 @@ private async Task startAgentsIfNecessaryAsync(IProjectionSet set, { await tryStartAgent(stoppingToken, daemon, name, set).ConfigureAwait(false); } - else if (agent.Status == AgentStatus.Paused && agent.PausedTime.HasValue && + else if (agent is { Status: AgentStatus.Paused, PausedTime: not null } && _timeProvider.GetUtcNow().Subtract(agent.PausedTime.Value) > _options.Projections.HealthCheckPollingTime) { diff --git a/src/Marten/Events/Daemon/Progress/UpdateProjectionProgress.cs b/src/Marten/Events/Daemon/Progress/UpdateProjectionProgress.cs index 5e002f0b47..3003e38472 100644 --- a/src/Marten/Events/Daemon/Progress/UpdateProjectionProgress.cs +++ b/src/Marten/Events/Daemon/Progress/UpdateProjectionProgress.cs @@ -4,6 +4,7 @@ using System.Threading; using System.Threading.Tasks; using JasperFx.Events; +using JasperFx.Events.Daemon; using JasperFx.Events.Projections; using Marten.Events.Daemon.Internals; using Marten.Exceptions; diff --git a/src/Marten/Events/EventGraph.cs b/src/Marten/Events/EventGraph.cs index 05f544c52b..16cba5e1fa 100644 --- a/src/Marten/Events/EventGraph.cs +++ b/src/Marten/Events/EventGraph.cs @@ -148,8 +148,6 @@ public StreamIdentity StreamIdentity /// public TenancyStyle TenancyStyle { get; set; } = TenancyStyle.Single; - public bool EnableGlobalProjectionsForConjoinedTenancy { get; set; } - public bool UseIdentityMapForAggregates { get; set; } /// diff --git a/src/Marten/Events/IEventStoreOptions.cs b/src/Marten/Events/IEventStoreOptions.cs index 5b9e23ca33..08a0cb5bd2 100644 --- a/src/Marten/Events/IEventStoreOptions.cs +++ b/src/Marten/Events/IEventStoreOptions.cs @@ -28,11 +28,6 @@ public interface IEventStoreOptions /// TenancyStyle TenancyStyle { get; set; } - /// - /// Enables global project projections (with single tenancy style) for events with conjoined tenancy - /// - bool EnableGlobalProjectionsForConjoinedTenancy { get; set; } - /// /// Opt into having Marten process "side effects" on aggregation projections (SingleStreamProjection/MultiStreamProjection) while /// running in an Inline lifecycle. Default is false; diff --git a/src/Marten/Events/IQueryEventStore.cs b/src/Marten/Events/IQueryEventStore.cs index be5630ab78..680e2d0c4e 100644 --- a/src/Marten/Events/IQueryEventStore.cs +++ b/src/Marten/Events/IQueryEventStore.cs @@ -64,6 +64,32 @@ Task> FetchStreamAsync(string streamKey, long version = 0, Task AggregateStreamAsync(string streamKey, long version = 0, DateTimeOffset? timestamp = null, T? state = null, long fromVersion = 0, CancellationToken token = default) where T : class; + /// + /// Perform a live aggregation of the raw events in this stream to a T object, but return the last known + /// version of the aggregate in case the aggregate itself is marked as deleted at a specific version + /// + /// + /// + /// If set, queries for events up to and including this version + /// If set, queries for events captured on or before this timestamp + /// + /// + Task AggregateStreamToLastKnownAsync(Guid streamId, long version = 0, DateTimeOffset? timestamp = null, + CancellationToken token = default) where T : class; + + /// + /// Perform a live aggregation of the raw events in this stream to a T object, but return the last known + /// version of the aggregate in case the aggregate itself is marked as deleted at a specific version + /// + /// + /// + /// If set, queries for events up to and including this version + /// If set, queries for events captured on or before this timestamp + /// + /// + Task AggregateStreamToLastKnownAsync(string streamKey, long version = 0, DateTimeOffset? timestamp = null, + CancellationToken token = default) where T : class; + /// /// Query directly against ONLY the raw event data. Use IQuerySession.Query() for aggregated documents! /// diff --git a/src/Marten/Events/Projections/MultiStreamProjection.cs b/src/Marten/Events/Projections/MultiStreamProjection.cs index ca4839db34..27bffdb475 100644 --- a/src/Marten/Events/Projections/MultiStreamProjection.cs +++ b/src/Marten/Events/Projections/MultiStreamProjection.cs @@ -25,7 +25,6 @@ namespace Marten.Events.Projections; /// public abstract class MultiStreamProjection: JasperFxMultiStreamProjectionBase, IMartenAggregateProjection, IValidatedProjection, IMartenRegistrable where TDoc : notnull where TId : notnull { - // TODO -- put the exception types in a constant somewhere protected MultiStreamProjection(): base() { } @@ -49,12 +48,6 @@ public int CacheLimitPerTenant set => Options.CacheLimitPerTenant = value; } - // TODO -- need to add this all the way back in JasperFx.Events - // public override SubscriptionDescriptor Describe() - // { - // return new SubscriptionDescriptor(this, SubscriptionType.MultiStreamProjection); - // } - [JasperFxIgnore] public IEnumerable ValidateConfiguration(StoreOptions options) { @@ -66,16 +59,21 @@ public IEnumerable ValidateConfiguration(StoreOptions options) $"Id type mismatch. The projection identity type is {typeof(TId).FullNameInCode()}, but the aggregate document {typeof(TDoc).FullNameInCode()} id type is {mapping.IdType.NameInCode()}"; } - // TODO -- revisit this with - if (options.Events.TenancyStyle != mapping.TenancyStyle - && (options.Events.TenancyStyle == TenancyStyle.Single - || options.Events is - { TenancyStyle: TenancyStyle.Conjoined, EnableGlobalProjectionsForConjoinedTenancy: false } - && Lifecycle != ProjectionLifecycle.Live) - ) + if (Lifecycle != ProjectionLifecycle.Live && options.Events.TenancyStyle == TenancyStyle.Conjoined && mapping.TenancyStyle == TenancyStyle.Single) { - yield return - $"Tenancy storage style mismatch between the events ({options.Events.TenancyStyle}) and the aggregate type {typeof(TDoc).FullNameInCode()} ({mapping.TenancyStyle})"; + if (TenancyGrouping == TenancyGrouping.RespectTenant) + { + yield return $"Tenancy storage style mismatch between the events ({options.Events.TenancyStyle}) and the aggregate type {typeof(TDoc).FullNameInCode()} ({mapping.TenancyStyle}) but the {nameof(TenancyGrouping)} is {TenancyGrouping}. Set to {TenancyGrouping.AcrossTenants} to explicitly enable the grouping across tenants"; + } + } + + if (Lifecycle != ProjectionLifecycle.Live && mapping.TenancyStyle == TenancyStyle.Conjoined && + options.Events.TenancyStyle == TenancyStyle.Single) + { + if (TenancyGrouping == TenancyGrouping.RespectTenant) + { + yield return $"Tenancy storage style mismatch between the events ({options.Events.TenancyStyle}) and the aggregate type {typeof(TDoc).FullNameInCode()} ({mapping.TenancyStyle}) but the {nameof(TenancyGrouping)} is {TenancyGrouping}. Set to {TenancyGrouping.AcrossTenants} to explicitly enable the grouping across tenants"; + } } if (mapping.DeleteStyle == DeleteStyle.SoftDelete && IsUsingConventionalMethods) diff --git a/src/Marten/Events/QueryEventStore.cs b/src/Marten/Events/QueryEventStore.cs index f20530218e..f4531ff3d6 100644 --- a/src/Marten/Events/QueryEventStore.cs +++ b/src/Marten/Events/QueryEventStore.cs @@ -79,6 +79,8 @@ public async Task> FetchStreamAsync(string streamKey, long return state; } + if (version != 0 && version > events.Last().Version) return null; + var aggregator = _store.Options.Projections.AggregatorFor(); var aggregate = await aggregator.BuildAsync(events, _session, state, token).ConfigureAwait(false); @@ -93,6 +95,35 @@ public async Task> FetchStreamAsync(string streamKey, long return aggregate; } + public async Task AggregateStreamToLastKnownAsync(Guid streamId, long version = 0, + DateTimeOffset? timestamp = null, + CancellationToken token = default) where T : class + { + var events = await FetchStreamAsync(streamId, version, timestamp, 0, token).ConfigureAwait(false); + if (!events.Any()) + { + return null; + } + + var aggregator = _store.Options.Projections.AggregatorFor(); + + T? aggregate = null; + while (aggregate == null && events.Any()) + { + aggregate = await aggregator.BuildAsync(events, _session, default, token).ConfigureAwait(false); + events = events.SkipLast(1).ToList(); + } + + if (aggregate != null) + { + var storage = _session.StorageFor(); + storage.SetIdentityFromGuid(aggregate, streamId); + } + + return aggregate; + } + + public async Task AggregateStreamAsync(string streamKey, long version = 0, DateTimeOffset? timestamp = null, T? state = null, long fromVersion = 0, CancellationToken token = default) where T : class { @@ -102,6 +133,8 @@ public async Task> FetchStreamAsync(string streamKey, long return state; } + if (version != 0 && version > events.Last().Version) return null; + var aggregator = _store.Options.Projections.AggregatorFor(); var aggregate = await aggregator.BuildAsync(events, _session, state, token).ConfigureAwait(false); @@ -115,6 +148,33 @@ public async Task> FetchStreamAsync(string streamKey, long return aggregate; } + public async Task AggregateStreamToLastKnownAsync(string streamKey, long version = 0, DateTimeOffset? timestamp = null, + CancellationToken token = default) where T : class + { + var events = await FetchStreamAsync(streamKey, version, timestamp, 0, token).ConfigureAwait(false); + if (!events.Any()) + { + return null; + } + + var aggregator = _store.Options.Projections.AggregatorFor(); + + T? aggregate = null; + while (aggregate == null && events.Any()) + { + aggregate = await aggregator.BuildAsync(events, _session, default, token).ConfigureAwait(false); + events = events.SkipLast(1).ToList(); + } + + if (aggregate != null) + { + var storage = _session.StorageFor(); + storage.SetIdentityFromString(aggregate, streamKey); + } + + return aggregate; + } + public IMartenQueryable QueryRawEventDataOnly() where T : notnull { _store.Events.AddEventType(); diff --git a/src/Marten/Exceptions/ProgressionProgressOutOfOrderException.cs b/src/Marten/Exceptions/ProgressionProgressOutOfOrderException.cs deleted file mode 100644 index 668ae173b6..0000000000 --- a/src/Marten/Exceptions/ProgressionProgressOutOfOrderException.cs +++ /dev/null @@ -1,12 +0,0 @@ -using JasperFx.Events.Projections; -using Marten.Events.Daemon; - -namespace Marten.Exceptions; - -public class ProgressionProgressOutOfOrderException: MartenException -{ - public ProgressionProgressOutOfOrderException(ShardName progressionOrShardName): base( - $"Progression '{progressionOrShardName}' is out of order. This may happen when multiple processes try to process the projection") - { - } -} diff --git a/src/Marten/Marten.csproj b/src/Marten/Marten.csproj index 107f4db251..a7fdfcad87 100644 --- a/src/Marten/Marten.csproj +++ b/src/Marten/Marten.csproj @@ -32,14 +32,13 @@ - + + - - - + diff --git a/src/Marten/MartenServiceCollectionExtensions.cs b/src/Marten/MartenServiceCollectionExtensions.cs index a98acd3dd7..5b160be93c 100644 --- a/src/Marten/MartenServiceCollectionExtensions.cs +++ b/src/Marten/MartenServiceCollectionExtensions.cs @@ -26,6 +26,7 @@ using Microsoft.Extensions.Logging.Abstractions; using Npgsql; using Weasel.Core.Migrations; +using Weasel.Core.MultiTenancy; namespace Marten; @@ -195,6 +196,8 @@ Func optionSource return new DocumentStore(options); }); + services.AddSingleton(s => (IMasterTableMultiTenancy)s.GetRequiredService()); + // This can be overridden by the expression following services.AddSingleton(sp => { @@ -299,6 +302,10 @@ public static MartenStoreExpression AddMartenStore(this IServiceCollection services.AddSingleton(s => config.Build(s)); + + services.AddSingleton(s => (IMasterTableMultiTenancy)s.GetRequiredService()); + + services.AddSingleton(s => { var options = config.BuildStoreOptions(s); diff --git a/src/Marten/MasterTableMultiTenancyExtensions.cs b/src/Marten/MasterTableMultiTenancyExtensions.cs deleted file mode 100644 index f289ca7822..0000000000 --- a/src/Marten/MasterTableMultiTenancyExtensions.cs +++ /dev/null @@ -1,46 +0,0 @@ -using System; -using System.Threading.Tasks; -using Marten.Storage; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Hosting; - -namespace Marten; - -public static class MasterTableMultiTenancyExtensions -{ - /// - /// Convenience method to clear all tenant database records - /// if using Marten - /// - /// - /// - /// - public static Task ClearAllTenantDatabaseRecordsAsync(this IHost host) - { - var store = host.Services.GetRequiredService() as DocumentStore; - var tenancy = store?.Options.Tenancy as MasterTableTenancy; - if (tenancy is null) - throw new InvalidOperationException("The Marten tenancy model is not using the master table tenancy"); - - return tenancy.ClearAllDatabaseRecordsAsync(); - - } - - /// - /// Convenience method to add a new tenant database to the master tenant table at runtime - /// - /// - /// - /// - /// - /// - public static Task AddTenantDatabaseAsync(this IHost host, string tenantId, string connectionString) - { - var store = host.Services.GetRequiredService() as DocumentStore; - var tenancy = store?.Options.Tenancy as MasterTableTenancy; - if (tenancy is null) - throw new InvalidOperationException("The Marten tenancy model is not using the master table tenancy"); - - return tenancy.AddDatabaseRecordAsync(tenantId, connectionString); - } -} diff --git a/src/Marten/Schema/DocumentMapping.cs b/src/Marten/Schema/DocumentMapping.cs index e758c22d7c..17a705c0a8 100644 --- a/src/Marten/Schema/DocumentMapping.cs +++ b/src/Marten/Schema/DocumentMapping.cs @@ -4,6 +4,7 @@ using System.Linq.Expressions; using System.Reflection; using System.Text.RegularExpressions; +using JasperFx; using JasperFx.Core; using JasperFx.Core.Reflection; using JasperFx.Descriptors; @@ -972,7 +973,7 @@ internal static class ForeignKeyExtensions public static void TryMoveTenantIdFirst(this ForeignKey foreignKey, DocumentMapping mapping) { // Guard clause, do nothing if this document is not tenanted or foreign key doesn't contain tenant id - if (mapping.TenancyStyle == TenancyStyle.Single || foreignKey.ColumnNames.Any(x => x != TenantIdColumn.Name)) return; + if (mapping.TenancyStyle == TenancyStyle.Single || !foreignKey.ColumnNames.Contains(StorageConstants.TenantIdColumn)) return; foreignKey.ColumnNames = new string[] { TenantIdColumn.Name } .Concat(foreignKey.ColumnNames.Where(x => x != TenantIdColumn.Name)).ToArray(); diff --git a/src/MultiTenancyTests/using_static_database_multitenancy.cs b/src/MultiTenancyTests/using_static_database_multitenancy.cs index 856187f0a8..dccc10bf00 100644 --- a/src/MultiTenancyTests/using_static_database_multitenancy.cs +++ b/src/MultiTenancyTests/using_static_database_multitenancy.cs @@ -85,7 +85,7 @@ public async Task InitializeAsync() [Fact] public async Task describing_the_database_usage() { - theStore.Options.Tenancy.Cardinality.ShouldBe(DatabaseCardinality.StaticMultiple); + theStore.Options.Tenancy.As().Cardinality.ShouldBe(DatabaseCardinality.StaticMultiple); var description = await theStore.Options.Tenancy.DescribeDatabasesAsync(CancellationToken.None); diff --git a/src/StressTests/StressTests.csproj b/src/StressTests/StressTests.csproj index bd848d68aa..753509c9e4 100644 --- a/src/StressTests/StressTests.csproj +++ b/src/StressTests/StressTests.csproj @@ -13,7 +13,6 @@ - @@ -23,6 +22,7 @@ + diff --git a/src/StressTests/using_multiple_document_stores_in_same_host.cs b/src/StressTests/using_multiple_document_stores_in_same_host.cs index 80e9983369..80f6dad8e6 100644 --- a/src/StressTests/using_multiple_document_stores_in_same_host.cs +++ b/src/StressTests/using_multiple_document_stores_in_same_host.cs @@ -20,6 +20,7 @@ using Shouldly; using Weasel.Core; using Weasel.Core.Migrations; +using Weasel.Core.MultiTenancy; using Xunit; namespace StressTests; @@ -56,6 +57,16 @@ public using_multiple_document_stores_in_same_host() }); } + [Fact] + public void all_stores_are_registered_as_master_table_tenancy() + { + var masterTableTenancyList = theContainer.GetAllInstances(); + masterTableTenancyList.Count.ShouldBe(3); + masterTableTenancyList.Any(x => x.GetType() == typeof(DocumentStore)).ShouldBeTrue(); + masterTableTenancyList.OfType().Any().ShouldBeTrue(); + masterTableTenancyList.OfType().Any().ShouldBeTrue(); + } + [Fact] public void get_all_document_stores() {