Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 14 additions & 6 deletions src/Marten/Events/Daemon/Internals/ProjectionUpdateBatch.cs
Original file line number Diff line number Diff line change
Expand Up @@ -240,14 +240,18 @@ public async Task PostUpdateAsync(IMartenSession session)
return;
}

var listeners = _settings.AsyncListeners.Concat(Listeners).ToArray();
if (listeners.Length == 0)
if (_settings.AsyncListeners.Count == 0 && Listeners.Count == 0)
{
return;
}

var unitOfWorkData = new UnitOfWork(_pages.SelectMany(x => x.Operations));
foreach (var listener in listeners)
foreach (var listener in _settings.AsyncListeners)
{
await listener.AfterCommitAsync((IDocumentSession)session, unitOfWorkData, _token)
.ConfigureAwait(false);
}
foreach (var listener in Listeners)
{
await listener.AfterCommitAsync((IDocumentSession)session, unitOfWorkData, _token)
.ConfigureAwait(false);
Expand All @@ -261,14 +265,18 @@ public async Task PreUpdateAsync(IMartenSession session)
return;
}

var listeners = _settings.AsyncListeners.Concat(Listeners).ToArray();
if (listeners.Length == 0)
if (_settings.AsyncListeners.Count == 0 && Listeners.Count == 0)
{
return;
}

var unitOfWorkData = new UnitOfWork(_pages.SelectMany(x => x.Operations));
foreach (var listener in listeners)
foreach (var listener in _settings.AsyncListeners)
{
await listener.BeforeCommitAsync((IDocumentSession)session, unitOfWorkData, _token)
.ConfigureAwait(false);
}
foreach (var listener in Listeners)
{
await listener.BeforeCommitAsync((IDocumentSession)session, unitOfWorkData, _token)
.ConfigureAwait(false);
Expand Down
77 changes: 60 additions & 17 deletions src/Marten/Events/Operations/QuickAppendEventsOperationBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,20 +48,21 @@ public void Postprocess(DbDataReader reader, IList<Exception> exceptions)
var values = reader.GetFieldValue<long[]>(0);

var finalVersion = values[0];
foreach (var e in Stream.Events.Reverse())
var events = Stream.Events;
for (int i = events.Count - 1; i >= 0; i--)
{
e.Version = finalVersion;
events[i].Version = finalVersion;
finalVersion--;
}

// Ignore the first value
for (int i = 1; i < values.Length; i++)
{
// Only setting the sequence to aid in tombstone processing
Stream.Events[i - 1].Sequence = values[i];
events[i - 1].Sequence = values[i];
}

if (Events is { UseMandatoryStreamTypeDeclaration: true } && Stream.Events[0].Version == 1)
if (Events is { UseMandatoryStreamTypeDeclaration: true } && events[0].Version == 1)
{
throw new NonExistentStreamException(Events.StreamIdentity == StreamIdentity.AsGuid
? Stream.Id
Expand Down Expand Up @@ -90,46 +91,87 @@ protected void writeBasicParameters(IGroupedParameterBuilder builder, IMartenSes
var param2 = builder.AppendParameter(Stream.TenantId);
param2.NpgsqlDbType = NpgsqlDbType.Varchar;

var param3 = builder.AppendParameter(Stream.Events.Select(x => x.Id).ToArray());
var events = Stream.Events;
var count = events.Count;
var ids = new Guid[count];
var typeNames = new string[count];
var dotNetTypeNames = new string[count];
var jsonBodies = new string[count];

for (int i = 0; i < count; i++)
{
var e = events[i];
ids[i] = e.Id;
typeNames[i] = e.EventTypeName;
dotNetTypeNames[i] = e.DotNetTypeName;
jsonBodies[i] = session.Serializer.ToJson(e.Data);
}

var param3 = builder.AppendParameter(ids);
param3.NpgsqlDbType = NpgsqlDbType.Array | NpgsqlDbType.Uuid;

var param4 = builder.AppendParameter(Stream.Events.Select(x => x.EventTypeName).ToArray());
var param4 = builder.AppendParameter(typeNames);
param4.NpgsqlDbType = NpgsqlDbType.Array | NpgsqlDbType.Varchar;

var param5 = builder.AppendParameter(Stream.Events.Select(x => x.DotNetTypeName).ToArray());
var param5 = builder.AppendParameter(dotNetTypeNames);
param5.NpgsqlDbType = NpgsqlDbType.Array | NpgsqlDbType.Varchar;

var param6 = builder.AppendParameter(Stream.Events.Select(e => session.Serializer.ToJson(e.Data)).ToArray());
var param6 = builder.AppendParameter(jsonBodies);
param6.NpgsqlDbType = NpgsqlDbType.Array | NpgsqlDbType.Jsonb;
}

protected void writeCausationIds(IGroupedParameterBuilder builder)
{
var param = builder.AppendParameter(Stream.Events.Select(x => x.CausationId).ToArray());
var events = Stream.Events;
var count = events.Count;
var causationIds = new string[count];
for (int i = 0; i < count; i++) causationIds[i] = events[i].CausationId;

var param = builder.AppendParameter(causationIds);
param.NpgsqlDbType = NpgsqlDbType.Array | NpgsqlDbType.Varchar;
}

protected void writeCorrelationIds(IGroupedParameterBuilder builder)
{
var param = builder.AppendParameter(Stream.Events.Select(x => x.CorrelationId).ToArray());
var events = Stream.Events;
var count = events.Count;
var correlationIds = new string[count];
for (int i = 0; i < count; i++) correlationIds[i] = events[i].CorrelationId;

var param = builder.AppendParameter(correlationIds);
param.NpgsqlDbType = NpgsqlDbType.Array | NpgsqlDbType.Varchar;
}

protected void writeHeaders(IGroupedParameterBuilder builder, IMartenSession session)
{
var param = builder.AppendParameter(Stream.Events.Select(x => session.Serializer.ToJson(x.Headers)).ToArray());
var events = Stream.Events;
var count = events.Count;
var headers = new string[count];
for (int i = 0; i < count; i++) headers[i] = session.Serializer.ToJson(events[i].Headers);

var param = builder.AppendParameter(headers);
param.NpgsqlDbType = NpgsqlDbType.Array | NpgsqlDbType.Jsonb;
}

protected void writeUserNames(IGroupedParameterBuilder builder, IMartenSession session)
{
var param = builder.AppendParameter(Stream.Events.Select(x => x.UserName).ToArray());
var events = Stream.Events;
var count = events.Count;
var userNames = new string[count];
for (int i = 0; i < count; i++) userNames[i] = events[i].UserName;

var param = builder.AppendParameter(userNames);
param.NpgsqlDbType = NpgsqlDbType.Array | NpgsqlDbType.Varchar;
}

protected void writeTimestamps(IGroupedParameterBuilder builder)
{
var param = builder.AppendParameter(Stream.Events.Select(x => x.Timestamp).ToArray());
var events = Stream.Events;
var count = events.Count;
var timestamps = new DateTimeOffset[count];
for (int i = 0; i < count; i++) timestamps[i] = events[i].Timestamp;

var param = builder.AppendParameter(timestamps);
param.NpgsqlDbType = NpgsqlDbType.Array | NpgsqlDbType.TimestampTz;
}

Expand All @@ -140,20 +182,21 @@ public async Task PostprocessAsync(DbDataReader reader, IList<Exception> excepti
var values = await reader.GetFieldValueAsync<long[]>(0, token).ConfigureAwait(false);

var finalVersion = values[0];
foreach (var e in Stream.Events.Reverse())
var events = Stream.Events;
for (int i = events.Count - 1; i >= 0; i--)
{
e.Version = finalVersion;
events[i].Version = finalVersion;
finalVersion--;
}

// Ignore the first value
for (int i = 1; i < values.Length; i++)
{
// Only setting the sequence to aid in tombstone processing
Stream.Events[i - 1].Sequence = values[i];
events[i - 1].Sequence = values[i];
}

if (Events is { UseMandatoryStreamTypeDeclaration: true } && Stream.Events[0].Version == 1)
if (Events is { UseMandatoryStreamTypeDeclaration: true } && events[0].Version == 1)
{
throw new NonExistentStreamException(Events.StreamIdentity == StreamIdentity.AsGuid
? Stream.Id
Expand Down
10 changes: 10 additions & 0 deletions src/Marten/Internal/DirtyTracking/ChangeTracker.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System;
using System.Diagnostics.CodeAnalysis;
using Marten.Internal.Operations;
using Newtonsoft.Json.Linq;
Expand All @@ -20,6 +21,15 @@ public ChangeTracker(IMartenSession session, T document)
public bool DetectChanges(IMartenSession session, [NotNullWhen(true)]out IStorageOperation? operation)
{
var newJson = session.Serializer.ToCleanJson(_document);

// Fast path: if the JSON strings are identical, skip expensive parsing
if (string.Equals(_json, newJson, StringComparison.Ordinal))
{
operation = null;
return false;
}

// Slow path: parse and deep-compare to handle semantically equivalent JSON
if (JToken.DeepEquals(JObject.Parse(_json), JObject.Parse(newJson)))
{
operation = null;
Expand Down
19 changes: 11 additions & 8 deletions src/Marten/Internal/Sessions/DirtyCheckingDocumentSession.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#nullable enable
using System;
using System.Collections.Generic;
using System.Linq;
using JasperFx.Core;
using Marten.Internal.Operations;
Expand Down Expand Up @@ -37,16 +38,18 @@ protected internal override void resetDirtyChecking()
{
foreach (var tracker in ChangeTrackers) tracker.Reset(this);

var knownDocuments = ChangeTrackers.Select(x => x.Document).ToArray();
var knownDocuments = new HashSet<object>(
ChangeTrackers.Select(x => x.Document),
ReferenceEqualityComparer.Instance);

var operations = _workTracker.AllOperations
.OfType<IDocumentStorageOperation>()
.Where(x => !knownDocuments.Contains(x.Document));

foreach (var operation in operations)
var operations = _workTracker.AllOperations;
for (int i = 0; i < operations.Count; i++)
{
var tracker = operation.ToTracker(this);
ChangeTrackers.Add(tracker);
if (operations[i] is IDocumentStorageOperation op && !knownDocuments.Contains(op.Document))
{
var tracker = op.ToTracker(this);
ChangeTrackers.Add(tracker);
}
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/Marten/Internal/Storage/DocumentStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public abstract class DocumentStorage<T, TId>: IDocumentStorage<T, TId>, IHaveMe
private readonly string _selectClause;
private readonly string[] _selectFields;
private ISqlFragment? _defaultWhere;
private MetadataColumn[]? _metadataColumns;
protected Action<T, TId> _setter;
protected Action<T, string> _setFromString = (_, _) => throw new NotSupportedException();
protected Action<T, Guid> _setFromGuid = (_, _) => throw new NotSupportedException();
Expand Down Expand Up @@ -147,7 +148,7 @@ public ISelectClause SelectClauseWithDuplicatedFields

MetadataColumn[] IHaveMetadataColumns.MetadataColumns()
{
return _mapping.Schema.Table.Columns.OfType<MetadataColumn>().ToArray();
return _metadataColumns ??= _mapping.Schema.Table.Columns.OfType<MetadataColumn>().ToArray();
}

public IQueryableMemberCollection QueryMembers => _mapping.QueryMembers;
Expand Down
16 changes: 15 additions & 1 deletion src/Marten/Internal/UnitOfWork.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,26 @@ public void Add(IStorageOperation operation)
}
}

public IReadOnlyList<IStorageOperation> AllOperations => _eventOperations.Concat(_operations).ToList();
public IReadOnlyList<IStorageOperation> AllOperations
{
get
{
if (_eventOperations.Count == 0) return _operations;
if (_operations.Count == 0) return _eventOperations;

var combined = new List<IStorageOperation>(_eventOperations.Count + _operations.Count);
combined.AddRange(_eventOperations);
combined.AddRange(_operations);
return combined;
}
}

public void Sort(StoreOptions options)
{
if (shouldSort(options, out var comparer))
{
// Must use a stable sort to preserve insertion order for operations
// on the same document type (required for self-referencing foreign keys)
var sorted = _operations.OrderBy(f => f, comparer).ToList();
_operations.Clear();
_operations.AddRange(sorted);
Expand Down
13 changes: 8 additions & 5 deletions src/Marten/Internal/UpdateBatch.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,18 +53,21 @@ private IEnumerable<OperationPage> buildPages(IMartenSession session)
else
{
var count = 0;
var batchSize = session.Options.UpdateBatchSize;

while (count < _operations.Count)
{
var operations = _operations
.Skip(count)
.Take(session.Options.UpdateBatchSize)
.ToArray();
var remaining = Math.Min(batchSize, _operations.Count - count);
var operations = new IStorageOperation[remaining];
for (int i = 0; i < remaining; i++)
{
operations[i] = _operations[count + i];
}

var page = new OperationPage(session, operations);
yield return page;

count += session.Options.UpdateBatchSize;
count += batchSize;
}
}
}
Expand Down
9 changes: 7 additions & 2 deletions src/Marten/Linq/QueryHandlers/NamedParameterHelper.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#nullable enable
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Reflection;
using NpgsqlTypes;
using Weasel.Postgresql;

Expand All @@ -14,15 +16,18 @@ namespace Marten.Linq.QueryHandlers;
/// </summary>
internal static class NamedParameterHelper
{
private static readonly ConcurrentDictionary<Type, PropertyInfo[]> _propertyCache = new();

/// <summary>
/// Appends the SQL to the builder, replacing :paramName references with positional
/// parameters that have proper NpgsqlDbType set based on the property's declared type.
/// This ensures null parameter values still carry type information.
/// </summary>
public static void AppendSqlWithNamedParameters(ICommandBuilder builder, string sql, object parameters)
{
var properties = parameters.GetType().GetProperties();
var propLookup = new Dictionary<string, (object? value, Type propertyType)>(StringComparer.OrdinalIgnoreCase);
var parameterType = parameters.GetType();
var properties = _propertyCache.GetOrAdd(parameterType, static t => t.GetProperties());
var propLookup = new Dictionary<string, (object? value, Type propertyType)>(properties.Length, StringComparer.OrdinalIgnoreCase);
foreach (var property in properties)
{
propLookup[property.Name] = (property.GetValue(parameters), property.PropertyType);
Expand Down
Loading