Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/Marten/Events/Daemon/Internals/ProjectionDocumentSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using Marten.Internal.Sessions;
using Marten.Internal.Storage;
using Marten.Services;
using Npgsql;

namespace Marten.Events.Daemon.Internals;

Expand All @@ -23,6 +24,15 @@ public ProjectionDocumentSession(DocumentStore store,
Mode = mode;
}

public override NpgsqlConnection Connection
{
get
{
throw new NotSupportedException(
"It is not supported to use \"sticky\" connections inside of projections or subscriptions");
}
}

internal override DocumentTracking TrackingMode => SessionOptions.Tracking;

protected internal override IDocumentStorage<T> selectStorage<T>(DocumentProvider<T> provider) =>
Expand Down
83 changes: 44 additions & 39 deletions src/Marten/Internal/Sessions/DocumentSessionBase.cs
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
#nullable enable
using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
using System.Threading.Tasks;
using JasperFx.Core;
using JasperFx.Core.Reflection;
using Marten.Events;
using Marten.Events.Aggregation;
using Marten.Events.Daemon.Internals;
using Marten.Exceptions;
using Marten.Internal.Operations;
using Marten.Internal.Storage;
Expand Down Expand Up @@ -54,7 +50,6 @@ public void EjectAllPendingChanges()
ChangeTrackers.Clear();
}


public void Store<T>(IEnumerable<T> entities) where T : notnull
{
Store(entities?.ToArray()!);
Expand Down Expand Up @@ -94,6 +89,7 @@ public void UpdateRevision<T>(T entity, int revision) where T : notnull
{
r.Revision = revision;
}

_workTracker.Add(op);
}

Expand All @@ -109,6 +105,7 @@ public void TryUpdateRevision<T>(T entity, int revision) where T : notnull
r.Revision = revision;
r.IgnoreConcurrencyViolation = true;
}

_workTracker.Add(op);
}

Expand Down Expand Up @@ -141,7 +138,11 @@ public void Insert<T>(params T[] entities) where T : notnull
{
storage.Store(this, entity);
var op = storage.Insert(entity, this, TenantId);
if (op is IRevisionedOperation r) r.Revision = 1;
if (op is IRevisionedOperation r)
{
r.Revision = 1;
}

_workTracker.Add(op);
}
}
Expand Down Expand Up @@ -206,15 +207,17 @@ public void InsertObjects(IEnumerable<object> documents)

public void QueueSqlCommand(string sql, params object[] parameterValues)
{
QueueSqlCommand(DefaultParameterPlaceholder, sql, parameterValues: parameterValues);
QueueSqlCommand(DefaultParameterPlaceholder, sql, parameterValues);
}

public void QueueSqlCommand(char placeholder, string sql, params object[] parameterValues)
{
sql = sql.TrimEnd(';');
if (sql.Contains(';'))
{
throw new ArgumentOutOfRangeException(nameof(sql),
"You must specify one SQL command at a time because of Marten's usage of command batching. ';' cannot be used as a command separator here.");
}

var operation = new ExecuteSqlStorageOperation(placeholder, sql, parameterValues);
QueueOperation(operation);
Expand Down Expand Up @@ -335,7 +338,8 @@ private void store<T>(IEnumerable<T> entities) where T : notnull
{
var storage = StorageFor<T>();

if (Concurrency == ConcurrencyChecks.Disabled && (storage.UseOptimisticConcurrency || storage.UseNumericRevisions))
if (Concurrency == ConcurrencyChecks.Disabled &&
(storage.UseOptimisticConcurrency || storage.UseNumericRevisions))
{
foreach (var entity in entities)
{
Expand Down Expand Up @@ -385,36 +389,6 @@ public void EjectPatchedTypes(IUnitOfWork changes)
foreach (var type in patchedTypes) EjectAllOfType(type);
}

internal interface IObjectHandler
{
void Execute(IDocumentSession session, IEnumerable<object> objects);
}

internal class StoreHandler<T>: IObjectHandler where T : notnull
{
public void Execute(IDocumentSession session, IEnumerable<object> objects)
{
// Delegate to the Store<T>() method
session.Store(objects.OfType<T>().ToArray());
}
}

internal class InsertHandler<T>: IObjectHandler where T : notnull
{
public void Execute(IDocumentSession session, IEnumerable<object> objects)
{
session.Insert(objects.OfType<T>().ToArray());
}
}

internal class DeleteHandler<T>: IObjectHandler where T : notnull
{
public void Execute(IDocumentSession session, IEnumerable<object> objects)
{
foreach (var document in objects.OfType<T>()) session.Delete(document);
}
}

internal void StoreDocumentInItemMap<TDoc, TId>(TId id, TDoc document) where TDoc : class where TId : notnull
{
if (ItemMap.ContainsKey(typeof(TDoc)))
Expand All @@ -429,7 +403,8 @@ internal void StoreDocumentInItemMap<TDoc, TId>(TId id, TDoc document) where TDo
}
}

internal bool TryGetAggregateFromIdentityMap<TDoc, TId>(TId id, [NotNullWhen(true)]out TDoc? document) where TDoc: notnull where TId : notnull
internal bool TryGetAggregateFromIdentityMap<TDoc, TId>(TId id, [NotNullWhen(true)] out TDoc? document)
where TDoc : notnull where TId : notnull
{
if (Options.EventGraph.UseIdentityMapForAggregates)
{
Expand All @@ -449,4 +424,34 @@ internal bool TryGetAggregateFromIdentityMap<TDoc, TId>(TId id, [NotNullWhen(tru
document = default;
return false;
}

internal interface IObjectHandler
{
void Execute(IDocumentSession session, IEnumerable<object> objects);
}

internal class StoreHandler<T>: IObjectHandler where T : notnull
{
public void Execute(IDocumentSession session, IEnumerable<object> objects)
{
// Delegate to the Store<T>() method
session.Store(objects.OfType<T>().ToArray());
}
}

internal class InsertHandler<T>: IObjectHandler where T : notnull
{
public void Execute(IDocumentSession session, IEnumerable<object> objects)
{
session.Insert(objects.OfType<T>().ToArray());
}
}

internal class DeleteHandler<T>: IObjectHandler where T : notnull
{
public void Execute(IDocumentSession session, IEnumerable<object> objects)
{
foreach (var document in objects.OfType<T>()) session.Delete(document);
}
}
}
2 changes: 1 addition & 1 deletion src/Marten/Internal/Sessions/QuerySession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ internal QuerySession(

public ConcurrencyChecks Concurrency { get; protected set; } = ConcurrencyChecks.Enabled;

public NpgsqlConnection Connection
public virtual NpgsqlConnection Connection
{
get
{
Expand Down
Loading