diff --git a/src/Marten/Events/Daemon/Internals/ProjectionDocumentSession.cs b/src/Marten/Events/Daemon/Internals/ProjectionDocumentSession.cs index cb185d9ca4..27e1ccab03 100644 --- a/src/Marten/Events/Daemon/Internals/ProjectionDocumentSession.cs +++ b/src/Marten/Events/Daemon/Internals/ProjectionDocumentSession.cs @@ -5,6 +5,7 @@ using Marten.Internal.Sessions; using Marten.Internal.Storage; using Marten.Services; +using Npgsql; namespace Marten.Events.Daemon.Internals; @@ -23,6 +24,15 @@ public ProjectionDocumentSession(DocumentStore store, Mode = mode; } + public override NpgsqlConnection Connection + { + get + { + throw new NotSupportedException( + "It is not supported to use \"sticky\" connections inside of projections or subscriptions"); + } + } + internal override DocumentTracking TrackingMode => SessionOptions.Tracking; protected internal override IDocumentStorage selectStorage(DocumentProvider provider) => diff --git a/src/Marten/Internal/Sessions/DocumentSessionBase.cs b/src/Marten/Internal/Sessions/DocumentSessionBase.cs index 3c23d73b23..f52c1539fe 100644 --- a/src/Marten/Internal/Sessions/DocumentSessionBase.cs +++ b/src/Marten/Internal/Sessions/DocumentSessionBase.cs @@ -1,14 +1,10 @@ -#nullable enable using System; using System.Collections.Generic; using System.Diagnostics.CodeAnalysis; using System.Linq; -using System.Threading.Tasks; using JasperFx.Core; using JasperFx.Core.Reflection; using Marten.Events; -using Marten.Events.Aggregation; -using Marten.Events.Daemon.Internals; using Marten.Exceptions; using Marten.Internal.Operations; using Marten.Internal.Storage; @@ -54,7 +50,6 @@ public void EjectAllPendingChanges() ChangeTrackers.Clear(); } - public void Store(IEnumerable entities) where T : notnull { Store(entities?.ToArray()!); @@ -94,6 +89,7 @@ public void UpdateRevision(T entity, int revision) where T : notnull { r.Revision = revision; } + _workTracker.Add(op); } @@ -109,6 +105,7 @@ public void TryUpdateRevision(T entity, int revision) where T : notnull r.Revision = revision; r.IgnoreConcurrencyViolation = true; } + _workTracker.Add(op); } @@ -141,7 +138,11 @@ public void Insert(params T[] entities) where T : notnull { storage.Store(this, entity); var op = storage.Insert(entity, this, TenantId); - if (op is IRevisionedOperation r) r.Revision = 1; + if (op is IRevisionedOperation r) + { + r.Revision = 1; + } + _workTracker.Add(op); } } @@ -206,15 +207,17 @@ public void InsertObjects(IEnumerable documents) public void QueueSqlCommand(string sql, params object[] parameterValues) { - QueueSqlCommand(DefaultParameterPlaceholder, sql, parameterValues: parameterValues); + QueueSqlCommand(DefaultParameterPlaceholder, sql, parameterValues); } public void QueueSqlCommand(char placeholder, string sql, params object[] parameterValues) { sql = sql.TrimEnd(';'); if (sql.Contains(';')) + { throw new ArgumentOutOfRangeException(nameof(sql), "You must specify one SQL command at a time because of Marten's usage of command batching. ';' cannot be used as a command separator here."); + } var operation = new ExecuteSqlStorageOperation(placeholder, sql, parameterValues); QueueOperation(operation); @@ -335,7 +338,8 @@ private void store(IEnumerable entities) where T : notnull { var storage = StorageFor(); - if (Concurrency == ConcurrencyChecks.Disabled && (storage.UseOptimisticConcurrency || storage.UseNumericRevisions)) + if (Concurrency == ConcurrencyChecks.Disabled && + (storage.UseOptimisticConcurrency || storage.UseNumericRevisions)) { foreach (var entity in entities) { @@ -385,36 +389,6 @@ public void EjectPatchedTypes(IUnitOfWork changes) foreach (var type in patchedTypes) EjectAllOfType(type); } - internal interface IObjectHandler - { - void Execute(IDocumentSession session, IEnumerable objects); - } - - internal class StoreHandler: IObjectHandler where T : notnull - { - public void Execute(IDocumentSession session, IEnumerable objects) - { - // Delegate to the Store() method - session.Store(objects.OfType().ToArray()); - } - } - - internal class InsertHandler: IObjectHandler where T : notnull - { - public void Execute(IDocumentSession session, IEnumerable objects) - { - session.Insert(objects.OfType().ToArray()); - } - } - - internal class DeleteHandler: IObjectHandler where T : notnull - { - public void Execute(IDocumentSession session, IEnumerable objects) - { - foreach (var document in objects.OfType()) session.Delete(document); - } - } - internal void StoreDocumentInItemMap(TId id, TDoc document) where TDoc : class where TId : notnull { if (ItemMap.ContainsKey(typeof(TDoc))) @@ -429,7 +403,8 @@ internal void StoreDocumentInItemMap(TId id, TDoc document) where TDo } } - internal bool TryGetAggregateFromIdentityMap(TId id, [NotNullWhen(true)]out TDoc? document) where TDoc: notnull where TId : notnull + internal bool TryGetAggregateFromIdentityMap(TId id, [NotNullWhen(true)] out TDoc? document) + where TDoc : notnull where TId : notnull { if (Options.EventGraph.UseIdentityMapForAggregates) { @@ -449,4 +424,34 @@ internal bool TryGetAggregateFromIdentityMap(TId id, [NotNullWhen(tru document = default; return false; } + + internal interface IObjectHandler + { + void Execute(IDocumentSession session, IEnumerable objects); + } + + internal class StoreHandler: IObjectHandler where T : notnull + { + public void Execute(IDocumentSession session, IEnumerable objects) + { + // Delegate to the Store() method + session.Store(objects.OfType().ToArray()); + } + } + + internal class InsertHandler: IObjectHandler where T : notnull + { + public void Execute(IDocumentSession session, IEnumerable objects) + { + session.Insert(objects.OfType().ToArray()); + } + } + + internal class DeleteHandler: IObjectHandler where T : notnull + { + public void Execute(IDocumentSession session, IEnumerable objects) + { + foreach (var document in objects.OfType()) session.Delete(document); + } + } } diff --git a/src/Marten/Internal/Sessions/QuerySession.cs b/src/Marten/Internal/Sessions/QuerySession.cs index 97b27a7bf3..0064b96b14 100644 --- a/src/Marten/Internal/Sessions/QuerySession.cs +++ b/src/Marten/Internal/Sessions/QuerySession.cs @@ -90,7 +90,7 @@ internal QuerySession( public ConcurrencyChecks Concurrency { get; protected set; } = ConcurrencyChecks.Enabled; - public NpgsqlConnection Connection + public virtual NpgsqlConnection Connection { get {