diff --git a/build/build.csproj b/build/build.csproj index daff97629c..e6deb0a983 100644 --- a/build/build.csproj +++ b/build/build.csproj @@ -12,7 +12,7 @@ - + diff --git a/src/DaemonTests/Resiliency/when_skipping_events_in_daemon.cs b/src/DaemonTests/Resiliency/when_skipping_events_in_daemon.cs index 2f7e304ab4..2e3a4abb51 100644 --- a/src/DaemonTests/Resiliency/when_skipping_events_in_daemon.cs +++ b/src/DaemonTests/Resiliency/when_skipping_events_in_daemon.cs @@ -135,14 +135,14 @@ public async Task see_the_dead_letter_events() theSession.Logger = new TestOutputMartenLogger(_output); var skipped = await theSession.Query().ToListAsync(); - skipped.Where(x => x.ProjectionName == "CollateNames" && x.ShardName == "All") - .Select(x => x.EventSequence).OrderBy(x => x) - .ShouldHaveTheSameElementsAs(4, 5, 6, 7); - skipped.Where(x => x.ProjectionName == "NamedDocuments" && x.ShardName == "All") .Select(x => x.EventSequence).OrderBy(x => x) .ShouldHaveTheSameElementsAs(4, 5, 6, 7, 11, 14); + skipped.Where(x => x.ProjectionName == "CollateNames" && x.ShardName == "All") + .Select(x => x.EventSequence).OrderBy(x => x) + .ShouldHaveTheSameElementsAs(4, 5, 6, 7); + } } diff --git a/src/DaemonTests/wait_for_non_stale_data_error_cases.cs b/src/DaemonTests/wait_for_non_stale_data_error_cases.cs new file mode 100644 index 0000000000..21c92a41de --- /dev/null +++ b/src/DaemonTests/wait_for_non_stale_data_error_cases.cs @@ -0,0 +1,78 @@ +using System; +using System.Threading.Tasks; +using EventSourcingTests.Aggregation; +using JasperFx.Core; +using JasperFx.Events; +using JasperFx.Events.Daemon; +using JasperFx.Events.Projections; +using Marten.Events.Aggregation; +using Marten.Testing.Harness; +using Shouldly; +using Xunit; + +namespace DaemonTests; + +public class wait_for_non_stale_data_error_cases : OneOffConfigurationsContext +{ + [Fact] + public async Task get_a_good_timeout_exception() + { + StoreOptions(opts => + { + opts.Projections.Errors.SkipApplyErrors = false; + opts.Projections.Add(ProjectionLifecycle.Async); + }); + + theSession.Events.StartStream(new AEvent(), new BEvent(), new CEvent(), new DEvent()); + theSession.Events.StartStream(new AEvent(), new BEvent(), new CEvent(), new DEvent()); + theSession.Events.StartStream(new AEvent(), new BEvent(), new CEvent(), new DEvent()); + theSession.Events.StartStream(new AEvent(), new ThrowError(false), new CEvent(), new DEvent()); + theSession.Events.StartStream(new AEvent(), new BEvent(), new CEvent(), new DEvent()); + theSession.Events.StartStream(new AEvent(), new BEvent(), new ThrowError(true), new DEvent()); + await theSession.SaveChangesAsync(); + + using var daemon = await theStore.BuildProjectionDaemonAsync(); + + await daemon.StartAllAsync(); + + var aggregated = await Should.ThrowAsync(async () => + { + await daemon.WaitForNonStaleData(5.Seconds()); + }); + + aggregated.InnerExceptions[0].ShouldBeOfType(); + aggregated.InnerExceptions[1].ShouldBeOfType(); + + + } +} + +public record ThrowError(bool ShouldThrow); + +public class SometimesFailingLetterCountsProjection: SingleStreamProjection +{ + public override LetterCounts Evolve(LetterCounts snapshot, Guid id, IEvent e) + { + snapshot ??= new LetterCounts { Id = id }; + switch (e.Data) + { + case AEvent _: + snapshot.ACount++; + break; + case BEvent _: + snapshot.BCount++; + break; + case CEvent _: + snapshot.CCount++; + break; + case DEvent _: + snapshot.DCount++; + break; + case ThrowError x: + if (x.ShouldThrow) throw new Exception("You stink!"); + break; + } + + return snapshot; + } +} diff --git a/src/EventSourcingTests/Bugs/Bug_3942_string_only_record.cs b/src/EventSourcingTests/Bugs/Bug_3942_string_only_record.cs new file mode 100644 index 0000000000..97d93869f7 --- /dev/null +++ b/src/EventSourcingTests/Bugs/Bug_3942_string_only_record.cs @@ -0,0 +1,51 @@ +using JasperFx.Core; +using JasperFx.Events; +using JasperFx.Events.Projections; +using Marten.Events.Aggregation; +using Marten.Testing.Harness; +using Xunit; +using System.Threading.Tasks; + +namespace EventSourcingTests.Bugs; + +public class single_property_async : BugIntegrationContext +{ + public single_property_async() + { + StoreOptions(o => + { + o.Events.StreamIdentity = StreamIdentity.AsString; + o.Projections.Add(ProjectionLifecycle.Async); + }, true); + } + + [Fact] + public async Task start_and_append_events() + { + await using var session = theStore.LightweightSession(); + + var stream = session.Events.StartStream("key", new SinglePropCreate()); + + await session.SaveChangesAsync(); + + var daemon = await theStore.BuildProjectionDaemonAsync(); + await daemon.StartAllAsync(); + await daemon.WaitForNonStaleData(20.Seconds()); + + var aggregate = await theSession.LoadAsync(stream.Key!); + + Assert.NotNull(aggregate); + } +} + +public class SingleProjection: SingleStreamProjection +{ + public SingleProp Create(IEvent @event) + { + return new SingleProp(@event.StreamKey); + } +} + +public record SingleProp(string Id); + +public record SinglePropCreate; diff --git a/src/EventSourcingTests/determining_the_event_store_identity.cs b/src/EventSourcingTests/determining_the_event_store_identity.cs new file mode 100644 index 0000000000..f431b111aa --- /dev/null +++ b/src/EventSourcingTests/determining_the_event_store_identity.cs @@ -0,0 +1,42 @@ +using System.Linq; +using System.Threading.Tasks; +using JasperFx.Core.Reflection; +using JasperFx.Events; +using Marten; +using Marten.Testing.Harness; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Shouldly; +using Xunit; + +namespace EventSourcingTests; + +public class determining_the_event_store_identity +{ + [Fact] + public async Task use_correct_identities() + { + using var host = await Host.CreateDefaultBuilder() + .ConfigureServices(services => + { + services.AddMarten(m => + { + m.Connection(ConnectionSource.ConnectionString); + m.DatabaseSchemaName = "es_identity"; + }); + + services.AddMartenStore(m => + { + m.Connection(ConnectionSource.ConnectionString); + m.DatabaseSchemaName = "things"; + }); + + }).StartAsync(); + + var stores = host.Services.GetServices().ToArray(); + stores.Single(x => x.GetType() == typeof(DocumentStore)).As().Identity.ShouldBe(new EventStoreIdentity("main", "marten")); + stores.OfType().Single().As().Identity.ShouldBe(new EventStoreIdentity("ithingstore", "marten")); + } +} + +public interface IThingStore: IDocumentStore; diff --git a/src/LinqTestsTypes/LinqTestsTypes.csproj b/src/LinqTestsTypes/LinqTestsTypes.csproj index 7de993b562..3acd379458 100644 --- a/src/LinqTestsTypes/LinqTestsTypes.csproj +++ b/src/LinqTestsTypes/LinqTestsTypes.csproj @@ -14,6 +14,6 @@ - + diff --git a/src/Marten/DocumentStore.EventStore.cs b/src/Marten/DocumentStore.EventStore.cs index 385913cb0f..7a976b765b 100644 --- a/src/Marten/DocumentStore.EventStore.cs +++ b/src/Marten/DocumentStore.EventStore.cs @@ -35,7 +35,6 @@ namespace Marten; public partial class DocumentStore: IEventStore, ISubscriptionRunner { - static DocumentStore() { ProjectionExceptions.RegisterTransientExceptionType(); @@ -58,6 +57,8 @@ bool IEventStore.HasMultipleTenants } } + public EventStoreIdentity Identity { get; } + IEventRegistry IEventStore.Registry => Options.EventGraph; public Type IdentityTypeForProjectedType(Type aggregateType) @@ -209,11 +210,11 @@ public async ValueTask> Sta var projectionBatch = new ProjectionBatch(session, batch, mode); if (range.SequenceFloor == 0) { - batch.Queue.Post(new InsertProjectionProgress(session.Options.EventGraph, range)); + await batch.Queue.PostAsync(new InsertProjectionProgress(session.Options.EventGraph, range)).ConfigureAwait(false); } else { - batch.Queue.Post(new UpdateProjectionProgress(session.Options.EventGraph, range)); + await batch.Queue.PostAsync(new UpdateProjectionProgress(session.Options.EventGraph, range)).ConfigureAwait(false); } return projectionBatch; @@ -288,7 +289,7 @@ async Task ISubscriptionRunner.ExecuteAsync(ISubscription subscri };; // Mark the progression - batch.Queue.Post(range.BuildProgressionOperation(Events)); + await batch.Queue.PostAsync(range.BuildProgressionOperation(Events)).ConfigureAwait(false); await using var session = new ProjectionDocumentSession(this, batch, new SessionOptions diff --git a/src/Marten/DocumentStore.cs b/src/Marten/DocumentStore.cs index 3e56b5272a..cfe1832bf9 100644 --- a/src/Marten/DocumentStore.cs +++ b/src/Marten/DocumentStore.cs @@ -83,6 +83,8 @@ public DocumentStore(StoreOptions options) decorator.ReadEventTypes(options.EventGraph); } + + Identity = new(Options.StoreName.ToLowerInvariant(), "marten"); } public ITenancy Tenancy => Options.Tenancy; @@ -391,6 +393,17 @@ public async ValueTask BuildProjectionDaemonAsync( return database.As().StartProjectionDaemon(this, logger); } + public async ValueTask BuildProjectionDaemonAsync(DatabaseId id) + { + var logger = Options.LogFactory?.CreateLogger() ?? Options.DotNetLogger ?? NullLogger.Instance; + + var database = await Tenancy.FindDatabase(id).ConfigureAwait(false); + + await database.EnsureStorageExistsAsync(typeof(IEvent)).ConfigureAwait(false); + + return database.As().StartProjectionDaemon(this, logger); + } + [Obsolete( """ Opening a session without explicitly providing desired type may be dropped in next Marten version. diff --git a/src/Marten/Events/AsyncProjectionTestingExtensions.cs b/src/Marten/Events/AsyncProjectionTestingExtensions.cs index ad6fb2c740..2dd972dc6d 100644 --- a/src/Marten/Events/AsyncProjectionTestingExtensions.cs +++ b/src/Marten/Events/AsyncProjectionTestingExtensions.cs @@ -1,12 +1,16 @@ using System; using System.Collections.Generic; +using System.IO; using System.Linq; using System.Threading; using System.Threading.Tasks; +using JasperFx.CommandLine.TextualDisplays; using JasperFx.Core; using JasperFx.Core.Reflection; +using JasperFx.Events.Daemon; using JasperFx.Events.Projections; using Marten.Events.Daemon; +using Marten.Services; using Marten.Storage; using Microsoft.Extensions.Hosting; @@ -85,6 +89,12 @@ public static async Task WaitForNonStaleProjectionDataAsync(this IMartenDatabase // Number of active projection shards, plus the high water mark var projectionsCount = database.As().Options.Projections.AllShards().Count + 1; + // Just get out of there if there are no projections + if (projectionsCount == 1) + { + return; + } + using var cancellationSource = new CancellationTokenSource(); cancellationSource.CancelAfter(timeout); @@ -105,33 +115,62 @@ public static async Task WaitForNonStaleProjectionDataAsync(this IMartenDatabase throw new TimeoutException("No event activity was detected within the timeout span"); } - IReadOnlyList projections; - do + IReadOnlyList projections = []; + try { - projections = await database.AllProjectionProgress(cancellationSource.Token).ConfigureAwait(false); - if ((projections.Count >= projectionsCount && - projections.All(x => x.Sequence >= initial.EventSequenceNumber)) - || cancellationSource.IsCancellationRequested) + do { - break; - } - - await Task.Delay(250.Milliseconds(), cancellationSource.Token).ConfigureAwait(false); - } while (true); + projections = await database.AllProjectionProgress(cancellationSource.Token).ConfigureAwait(false); + if ((projections.Count >= projectionsCount && + projections.All(x => x.Sequence >= initial.EventSequenceNumber)) + || cancellationSource.IsCancellationRequested) + { + break; + } + + await Task.Delay(250.Milliseconds(), cancellationSource.Token).ConfigureAwait(false); + } while (true); + } + catch (TaskCanceledException) + { + // We just didn't finish + } if (projections.Count < projectionsCount) { - throw new TimeoutException( - $"The projection shards (in total of {projectionsCount}) haven't been completely started within the timeout span"); + var writer = new StringWriter(); + await writer.WriteLineAsync($"The projection shards (in total of {projectionsCount}) haven't been completely started within the timeout span").ConfigureAwait(false); + await writer.WriteLineAsync().ConfigureAwait(false); + await writer.WriteLineAsync(writeStatusMessage(projections)).ConfigureAwait(false); + await writer.WriteLineAsync().ConfigureAwait(false); + + throw new TimeoutException(writer.ToString()); } if (cancellationSource.IsCancellationRequested) { - throw new TimeoutException( - $"The projections timed out before reaching the initial sequence of {initial.EventSequenceNumber}"); + var writer = new StringWriter(); + await writer.WriteLineAsync($"The projections timed out before reaching the initial sequence of {initial.EventSequenceNumber}").ConfigureAwait(false); + await writer.WriteLineAsync().ConfigureAwait(false); + await writer.WriteLineAsync(writeStatusMessage(projections)).ConfigureAwait(false); + await writer.WriteLineAsync().ConfigureAwait(false); + + throw new TimeoutException(writer.ToString()); } } + private static string writeStatusMessage(IReadOnlyList projections) + { + + var grid = new Grid(); + grid.AddColumn("Shard Name", x => x.ShardName); + grid.AddColumn("Sequence", x => x.Sequence.ToString(), true); + + return grid.Write(projections); + + + } + private static bool isComplete(this Dictionary tracking, long highWaterMark) { return tracking.Values.All(x => x >= highWaterMark); diff --git a/src/Marten/Events/Daemon/Internals/BlockExtensions.cs b/src/Marten/Events/Daemon/Internals/BlockExtensions.cs deleted file mode 100644 index dcdb768570..0000000000 --- a/src/Marten/Events/Daemon/Internals/BlockExtensions.cs +++ /dev/null @@ -1,15 +0,0 @@ -using System.Threading; -using System.Threading.Tasks.Dataflow; - -namespace Marten.Events.Daemon.Internals; - -internal static class BlockExtensions -{ - public static ExecutionDataflowBlockOptions SequentialOptions(this CancellationToken token) - { - return new ExecutionDataflowBlockOptions - { - EnsureOrdered = true, MaxDegreeOfParallelism = 1, CancellationToken = token - }; - } -} diff --git a/src/Marten/Events/Daemon/Internals/ProjectionUpdateBatch.cs b/src/Marten/Events/Daemon/Internals/ProjectionUpdateBatch.cs index 0ed5ccc892..7a3b100d9c 100644 --- a/src/Marten/Events/Daemon/Internals/ProjectionUpdateBatch.cs +++ b/src/Marten/Events/Daemon/Internals/ProjectionUpdateBatch.cs @@ -1,10 +1,9 @@ using System; -using System.Collections; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; -using System.Threading.Tasks.Dataflow; +using JasperFx.Blocks; using JasperFx.Core; using JasperFx.Events; using JasperFx.Events.Daemon; @@ -17,7 +16,6 @@ using Marten.Services; namespace Marten.Events.Daemon.Internals; -#nullable enable /// /// Incrementally built batch command for projection updates @@ -26,22 +24,16 @@ public class ProjectionUpdateBatch: IUpdateBatch, IAsyncDisposable, IDisposable, { private readonly List _documentTypes = new(); private readonly List _pages = new(); + + private readonly List _patches = new(); + private readonly SemaphoreSlim _semaphore = new(1, 1); private readonly ProjectionOptions _settings; private readonly CancellationToken _token; + + private IMessageBatch? _batch; private OperationPage? _current; private DocumentSessionBase? _session; - private IMartenSession Session - { - get => _session ?? throw new InvalidOperationException("Session already released"); - } - - public List Listeners { get; } = new(); - - public ShardExecutionMode Mode { get; } - - public bool ShouldApplyListeners { get; set; } - internal ProjectionUpdateBatch(ProjectionOptions settings, DocumentSessionBase? session, ShardExecutionMode mode, CancellationToken token) { @@ -49,29 +41,46 @@ internal ProjectionUpdateBatch(ProjectionOptions settings, _session = session ?? throw new ArgumentNullException(nameof(session)); _token = token; Mode = mode; - Queue = new ActionBlock(processOperation, - new ExecutionDataflowBlockOptions - { - MaxDegreeOfParallelism = 1, EnsureOrdered = true, CancellationToken = token - }); + + Queue = new Block(processOperationAsync); startNewPage(session); } - public async Task WaitForCompletion() + private IMartenSession Session => _session ?? throw new InvalidOperationException("Session already released"); + + public List Listeners { get; } = new(); + + public ShardExecutionMode Mode { get; } + + public bool ShouldApplyListeners { get; set; } + + // TODO -- make this private + public Block Queue { get; } + + public async ValueTask DisposeAsync() { Queue.Complete(); - await Queue.Completion.ConfigureAwait(false); - foreach (var patch in _patches) + await Queue.DisposeAsync().ConfigureAwait(false); + + foreach (var page in _pages) page.ReleaseSession(); + + if (_session != null) { - applyOperation(patch); + await _session.DisposeAsync().ConfigureAwait(true); + _session = null; } - } - // TODO -- make this private - public ActionBlock Queue { get; } + Dispose(false); + GC.SuppressFinalize(this); + } + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } IEnumerable IUnitOfWork.Deletions() @@ -205,7 +214,7 @@ public void EjectAll() throw new NotSupportedException(); } - public void PurgeOperations(TId id) where T : notnull where TId: notnull + public void PurgeOperations(TId id) where T : notnull where TId : notnull { // Do nothing here } @@ -223,7 +232,10 @@ public async Task PostUpdateAsync(IMartenSession session) } var listeners = _settings.AsyncListeners.Concat(Listeners).ToArray(); - if (listeners.Length == 0) return; + if (listeners.Length == 0) + { + return; + } var unitOfWorkData = new UnitOfWork(_pages.SelectMany(x => x.Operations)); foreach (var listener in listeners) @@ -241,7 +253,10 @@ public async Task PreUpdateAsync(IMartenSession session) } var listeners = _settings.AsyncListeners.Concat(Listeners).ToArray(); - if (listeners.Length == 0) return; + if (listeners.Length == 0) + { + return; + } var unitOfWorkData = new UnitOfWork(_pages.SelectMany(x => x.Operations)); foreach (var listener in listeners) @@ -251,30 +266,52 @@ await listener.BeforeCommitAsync((IDocumentSession)session, unitOfWorkData, _tok } } + public IReadOnlyList BuildPages(IMartenSession session) + { + if (_token.IsCancellationRequested) + { + return Array.Empty(); + } + + // Guard against empty batches + return _pages.Where(x => x.Operations.Any()).ToList(); + } + + public async Task WaitForCompletion() + { + await Queue.WaitForCompletionAsync().ConfigureAwait(false); + + foreach (var patch in _patches) applyOperation(patch); + } + private void startNewPage(IMartenSession session) { if (_token.IsCancellationRequested) + { return; + } _current = new OperationPage(session); _pages.Add(_current); } - private readonly List _patches = new(); - - private void processOperation(IStorageOperation operation) + private Task processOperationAsync(IStorageOperation operation, CancellationToken _) { if (_token.IsCancellationRequested) - return; + { + return Task.CompletedTask; + } // If there's one patch, then everything needs to be queued up for later if (operation is PatchOperation || _patches.Any()) { _patches.Add(operation); - return; + return Task.CompletedTask; } applyOperation(operation); + + return Task.CompletedTask; } private void applyOperation(IStorageOperation operation) @@ -289,45 +326,18 @@ private void applyOperation(IStorageOperation operation) } } - public IReadOnlyList BuildPages(IMartenSession session) - { - if (_token.IsCancellationRequested) - { - return Array.Empty(); - } - - // Guard against empty batches - return _pages.Where(x => x.Operations.Any()).ToList(); - } - - - public ValueTask CloseSession() => DisposeAsync(); - - public void Dispose() - { - Dispose(disposing: true); - GC.SuppressFinalize(this); - } - public async ValueTask DisposeAsync() + public ValueTask CloseSession() { - Queue.Complete(); - foreach (var page in _pages) page.ReleaseSession(); - - if (_session != null) - { - await _session.DisposeAsync().ConfigureAwait(true); - _session = null; - } - - Dispose(disposing: false); - GC.SuppressFinalize(this); + return DisposeAsync(); } protected void Dispose(bool disposing) { if (!disposing) + { return; + } Queue.Complete(); @@ -338,16 +348,19 @@ protected void Dispose(bool disposing) _session = null; } - private IMessageBatch? _batch; - private readonly SemaphoreSlim _semaphore = new(1, 1); - public async ValueTask CurrentMessageBatch(DocumentSessionBase session) { - if (_batch != null) return _batch; + if (_batch != null) + { + return _batch; + } await _semaphore.WaitAsync(_token).ConfigureAwait(false); - if (_batch != null) return _batch; + if (_batch != null) + { + return _batch; + } try { @@ -361,6 +374,4 @@ public async ValueTask CurrentMessageBatch(DocumentSessionBase se _semaphore.Release(); } } - - } diff --git a/src/Marten/Events/Daemon/ProjectionDaemon.cs b/src/Marten/Events/Daemon/ProjectionDaemon.cs index db38a7a5c3..1eeef4024b 100644 --- a/src/Marten/Events/Daemon/ProjectionDaemon.cs +++ b/src/Marten/Events/Daemon/ProjectionDaemon.cs @@ -1,5 +1,9 @@ +using System; +using System.Threading; +using System.Threading.Tasks; using JasperFx.Events.Daemon; using JasperFx.Events.Daemon.HighWater; +using JasperFx.Events.Projections; using Marten.Events.Projections; using Marten.Storage; using Microsoft.Extensions.Logging; diff --git a/src/Marten/Events/Projections/Flattened/KebabConverter.cs b/src/Marten/Events/Projections/Flattened/KebabConverter.cs index 3bbde3aefb..f1cd6eff60 100644 --- a/src/Marten/Events/Projections/Flattened/KebabConverter.cs +++ b/src/Marten/Events/Projections/Flattened/KebabConverter.cs @@ -4,8 +4,6 @@ namespace Marten.Events.Projections.Flattened; internal static class KebabConverter { - // Shamelessly stolen from https://stackoverflow.com/questions/37301287/how-do-i-convert-pascalcase-to-kebab-case-with-c - public static string ToKebabCase(this string str) { diff --git a/src/Marten/Internal/Sessions/AutoClosingLifetime.cs b/src/Marten/Internal/Sessions/AutoClosingLifetime.cs index 1ec1ac782f..d49bf90684 100644 --- a/src/Marten/Internal/Sessions/AutoClosingLifetime.cs +++ b/src/Marten/Internal/Sessions/AutoClosingLifetime.cs @@ -3,6 +3,7 @@ using System.Collections.Generic; using System.Data; using System.Data.Common; +using System.Diagnostics; using System.Linq; using System.Runtime.ExceptionServices; using System.Threading; @@ -295,6 +296,15 @@ public async Task ExecuteBatchPagesAsync(IReadOnlyList pages, Lis foreach (var page in pages) { var batch = page.Compile(); + + foreach (var batchCommand in batch.BatchCommands) + { + if (batchCommand.CommandText.IsEmpty()) + { + Debug.WriteLine("what the hell?"); + } + } + batch.Timeout = CommandTimeout; batch.Connection = conn; batch.Transaction = tx; diff --git a/src/Marten/Marten.csproj b/src/Marten/Marten.csproj index 946c884e2c..f1324cf8a0 100644 --- a/src/Marten/Marten.csproj +++ b/src/Marten/Marten.csproj @@ -33,14 +33,15 @@ - - + + + - + diff --git a/src/Marten/Storage/DefaultTenancy.cs b/src/Marten/Storage/DefaultTenancy.cs index 03e9e0321e..bc3b0703c4 100644 --- a/src/Marten/Storage/DefaultTenancy.cs +++ b/src/Marten/Storage/DefaultTenancy.cs @@ -40,6 +40,11 @@ public ValueTask FindOrCreateDatabase(string tenantIdOrDatabase return new ValueTask(Default.Database); } + public ValueTask FindDatabase(DatabaseId id) + { + return new ValueTask(Default.Database); + } + public bool IsTenantStoredInCurrentDatabase(IMartenDatabase database, string tenantId) { return true; diff --git a/src/Marten/Storage/ITenancy.cs b/src/Marten/Storage/ITenancy.cs index f1080cea96..f17c84e21d 100644 --- a/src/Marten/Storage/ITenancy.cs +++ b/src/Marten/Storage/ITenancy.cs @@ -57,6 +57,16 @@ public interface ITenancy: IDatabaseSource, IDisposable, IDatabaseUser /// ValueTask FindOrCreateDatabase(string tenantIdOrDatabaseIdentifier); + /// + /// Find or create the named database + /// + /// + /// + ValueTask FindDatabase(DatabaseId id) + { + throw new NotImplementedException("You will need to implement this interface method to use a Marten store with Wolverine projection/subscription distribution"); + } + /// /// Asserts that the requested tenant id is part of the current database /// diff --git a/src/Marten/Storage/MartenDatabase.cs b/src/Marten/Storage/MartenDatabase.cs index 83a9533d58..9fa977191e 100644 --- a/src/Marten/Storage/MartenDatabase.cs +++ b/src/Marten/Storage/MartenDatabase.cs @@ -103,7 +103,12 @@ public override DatabaseDescriptor Describe() { var descriptor = base.Describe(); descriptor.SubjectUri = MartenSystemPart.MartenStoreUri; - descriptor.SchemaOrNamespace = Options.DatabaseSchemaName; + if (descriptor.SchemaOrNamespace.IsEmpty()) + { + descriptor.SchemaOrNamespace = Options?.DatabaseSchemaName ?? "public"; + } + + descriptor.TenantIds.AddRange(TenantIds); return descriptor; } diff --git a/src/Marten/Storage/MasterTableTenancy.cs b/src/Marten/Storage/MasterTableTenancy.cs index faf47b5323..fa61b46188 100644 --- a/src/Marten/Storage/MasterTableTenancy.cs +++ b/src/Marten/Storage/MasterTableTenancy.cs @@ -99,8 +99,9 @@ public async ValueTask> BuildDatabases() tenantId = _options.TenantIdStyle.MaybeCorrectTenantId(tenantId); // Be idempotent, don't duplicate - if (_databases.Contains(tenantId)) + if (_databases.TryFind(tenantId, out var db)) { + db.TenantIds.Fill(tenantId); continue; } @@ -109,6 +110,9 @@ public async ValueTask> BuildDatabases() var database = new MartenDatabase(_options, _options.NpgsqlDataSourceFactory.Create(connectionString), tenantId); + + database.TenantIds.Add(tenantId); + _databases = _databases.AddOrUpdate(tenantId, database); } @@ -185,6 +189,25 @@ public async ValueTask FindOrCreateDatabase(string tenantIdOrDa return database; } + public async ValueTask FindDatabase(DatabaseId id) + { + // Not worried about this being optimized at all + var database = _databases.Enumerate().Select(x => x.Value).FirstOrDefault(x => x.Id == id); + if (database != null) return database; + + // Try to refresh once + await BuildDatabases().ConfigureAwait(false); + + database = _databases.Enumerate().Select(x => x.Value).FirstOrDefault(x => x.Id == id); + + if (database == null) + { + throw new ArgumentOutOfRangeException(nameof(id), $"Requested database {id.Identity} cannot be found"); + } + + return database; + } + public bool IsTenantStoredInCurrentDatabase(IMartenDatabase database, string tenantId) { tenantId = _options.TenantIdStyle.MaybeCorrectTenantId(tenantId); @@ -276,10 +299,16 @@ private async Task seedDatabasesAsync(NpgsqlConnection conn) connectionString = _configuration.CorrectConnectionString(connectionString); - return connectionString.IsNotEmpty() - ? new MartenDatabase(_options, - _options.NpgsqlDataSourceFactory.Create(connectionString), tenantId) - : null; + if (connectionString.IsNotEmpty()) + { + var db = new MartenDatabase(_options, + _options.NpgsqlDataSourceFactory.Create(connectionString), tenantId); + db.TenantIds.Add(tenantId); + + return db; + } + + return null; } internal class TenantLookupDatabase: PostgresqlDatabase diff --git a/src/Marten/Storage/SingleServerMultiTenancy.cs b/src/Marten/Storage/SingleServerMultiTenancy.cs index 7d011f44c8..59ae50e26f 100644 --- a/src/Marten/Storage/SingleServerMultiTenancy.cs +++ b/src/Marten/Storage/SingleServerMultiTenancy.cs @@ -5,6 +5,7 @@ using System.Threading.Tasks; using ImTools; using JasperFx; +using JasperFx.Core.Reflection; using JasperFx.Descriptors; using JasperFx.MultiTenancy; using Marten.Schema; @@ -85,6 +86,18 @@ public void Dispose() _default?.Database?.Dispose(); } + public ValueTask FindDatabase(DatabaseId id) + { + // Not worried about this being optimized at all + var database = _tenants.Enumerate().Select(x => x.Value.Database).FirstOrDefault(x => x.Id == id); + if (database == null) + { + throw new ArgumentOutOfRangeException(nameof(id), $"Requested database {id.Identity} cannot be found"); + } + + return ValueTask.FromResult(database); + } + public Tenant GetTenant(string tenantId) { if (_tenants.TryFind(tenantId, out var tenant)) diff --git a/src/Marten/Storage/StandinDatabase.cs b/src/Marten/Storage/StandinDatabase.cs index 866de48111..aa7fcd9623 100644 --- a/src/Marten/Storage/StandinDatabase.cs +++ b/src/Marten/Storage/StandinDatabase.cs @@ -24,11 +24,15 @@ public StandinDatabase(StoreOptions options) Providers = new ProviderGraph(options); } + public DatabaseId Id { get; } = new DatabaseId("standin", "standin"); + public DatabaseDescriptor Describe() { throw new NotImplementedException(); } + public List TenantIds { get; } = new(); + public IFeatureSchema[] BuildFeatureSchemas() { throw new NotImplementedException(); diff --git a/src/Marten/Storage/StaticMultiTenancy.cs b/src/Marten/Storage/StaticMultiTenancy.cs index 7bbc35c5e3..be3b891e17 100644 --- a/src/Marten/Storage/StaticMultiTenancy.cs +++ b/src/Marten/Storage/StaticMultiTenancy.cs @@ -134,6 +134,18 @@ public ValueTask> BuildDatabases() return new ValueTask>(databases); } + ValueTask ITenancy.FindDatabase(DatabaseId id) + { + // Not worried about this being optimized at all + var database = _databases.Enumerate().Select(x => x.Value).FirstOrDefault(x => x.Id == id); + if (database == null) + { + throw new ArgumentOutOfRangeException(nameof(id), $"Requested database {id.Identity} cannot be found"); + } + + return ValueTask.FromResult(database); + } + public Tenant GetTenant(string tenantId) { if (_tenants.TryFind(tenantId, out var tenant)) @@ -188,6 +200,7 @@ public DatabaseExpression ForTenants(params string[] tenantIds) foreach (var tenantId in tenantIds) { var tenant = new Tenant(tenantId, _database); + _database.TenantIds.Add(tenantId); _parent._tenants = _parent._tenants.AddOrUpdate(tenantId, tenant); } diff --git a/src/Marten/Storage/Tenant2.cs b/src/Marten/Storage/Tenant2.cs index 4d1c948676..70c011eca6 100644 --- a/src/Marten/Storage/Tenant2.cs +++ b/src/Marten/Storage/Tenant2.cs @@ -1,5 +1,6 @@ using System; using JasperFx; +using JasperFx.Core; namespace Marten.Storage; @@ -9,6 +10,11 @@ public Tenant(string tenantId, IMartenDatabase inner) { Database = inner; TenantId = tenantId; + + if (Database is MartenDatabase db) + { + db.TenantIds.Fill(tenantId); + } } public string TenantId { get; } diff --git a/src/Marten/StoreOptions.GeneratesCode.cs b/src/Marten/StoreOptions.GeneratesCode.cs index 23d4b6ca8d..7a27bf61db 100644 --- a/src/Marten/StoreOptions.GeneratesCode.cs +++ b/src/Marten/StoreOptions.GeneratesCode.cs @@ -40,7 +40,7 @@ public bool SourceCodeWritingEnabled } // This would only be set for "additional" document stores - public string StoreName { get; set; } = "Marten"; + public string StoreName { get; set; } = "Main"; /// /// Root folder where generated code should be placed. By default, this is the IHostEnvironment.ContentRootPath @@ -72,7 +72,7 @@ internal GenerationRules CreateGenerationRules() SourceCodeWritingEnabled = SourceCodeWritingEnabled }; - if (StoreName.IsNotEmpty() && StoreName != "Marten") + if (StoreName.IsNotEmpty() && StoreName != "Marten" && StoreName != "Main") { rules.GeneratedNamespace += "." + StoreName; rules.GeneratedCodeOutputPath = Path.Combine(rules.GeneratedCodeOutputPath, StoreName);