diff --git a/src/Marten/Events/Daemon/Internals/ProjectionUpdateBatch.cs b/src/Marten/Events/Daemon/Internals/ProjectionUpdateBatch.cs index d275dcad44..bb423bec34 100644 --- a/src/Marten/Events/Daemon/Internals/ProjectionUpdateBatch.cs +++ b/src/Marten/Events/Daemon/Internals/ProjectionUpdateBatch.cs @@ -240,14 +240,18 @@ public async Task PostUpdateAsync(IMartenSession session) return; } - var listeners = _settings.AsyncListeners.Concat(Listeners).ToArray(); - if (listeners.Length == 0) + if (_settings.AsyncListeners.Count == 0 && Listeners.Count == 0) { return; } var unitOfWorkData = new UnitOfWork(_pages.SelectMany(x => x.Operations)); - foreach (var listener in listeners) + foreach (var listener in _settings.AsyncListeners) + { + await listener.AfterCommitAsync((IDocumentSession)session, unitOfWorkData, _token) + .ConfigureAwait(false); + } + foreach (var listener in Listeners) { await listener.AfterCommitAsync((IDocumentSession)session, unitOfWorkData, _token) .ConfigureAwait(false); @@ -261,14 +265,18 @@ public async Task PreUpdateAsync(IMartenSession session) return; } - var listeners = _settings.AsyncListeners.Concat(Listeners).ToArray(); - if (listeners.Length == 0) + if (_settings.AsyncListeners.Count == 0 && Listeners.Count == 0) { return; } var unitOfWorkData = new UnitOfWork(_pages.SelectMany(x => x.Operations)); - foreach (var listener in listeners) + foreach (var listener in _settings.AsyncListeners) + { + await listener.BeforeCommitAsync((IDocumentSession)session, unitOfWorkData, _token) + .ConfigureAwait(false); + } + foreach (var listener in Listeners) { await listener.BeforeCommitAsync((IDocumentSession)session, unitOfWorkData, _token) .ConfigureAwait(false); diff --git a/src/Marten/Events/Operations/QuickAppendEventsOperationBase.cs b/src/Marten/Events/Operations/QuickAppendEventsOperationBase.cs index 92e15a1719..08bf5a772e 100644 --- a/src/Marten/Events/Operations/QuickAppendEventsOperationBase.cs +++ b/src/Marten/Events/Operations/QuickAppendEventsOperationBase.cs @@ -48,9 +48,10 @@ public void Postprocess(DbDataReader reader, IList exceptions) var values = reader.GetFieldValue(0); var finalVersion = values[0]; - foreach (var e in Stream.Events.Reverse()) + var events = Stream.Events; + for (int i = events.Count - 1; i >= 0; i--) { - e.Version = finalVersion; + events[i].Version = finalVersion; finalVersion--; } @@ -58,10 +59,10 @@ public void Postprocess(DbDataReader reader, IList exceptions) for (int i = 1; i < values.Length; i++) { // Only setting the sequence to aid in tombstone processing - Stream.Events[i - 1].Sequence = values[i]; + events[i - 1].Sequence = values[i]; } - if (Events is { UseMandatoryStreamTypeDeclaration: true } && Stream.Events[0].Version == 1) + if (Events is { UseMandatoryStreamTypeDeclaration: true } && events[0].Version == 1) { throw new NonExistentStreamException(Events.StreamIdentity == StreamIdentity.AsGuid ? Stream.Id @@ -90,46 +91,87 @@ protected void writeBasicParameters(IGroupedParameterBuilder builder, IMartenSes var param2 = builder.AppendParameter(Stream.TenantId); param2.NpgsqlDbType = NpgsqlDbType.Varchar; - var param3 = builder.AppendParameter(Stream.Events.Select(x => x.Id).ToArray()); + var events = Stream.Events; + var count = events.Count; + var ids = new Guid[count]; + var typeNames = new string[count]; + var dotNetTypeNames = new string[count]; + var jsonBodies = new string[count]; + + for (int i = 0; i < count; i++) + { + var e = events[i]; + ids[i] = e.Id; + typeNames[i] = e.EventTypeName; + dotNetTypeNames[i] = e.DotNetTypeName; + jsonBodies[i] = session.Serializer.ToJson(e.Data); + } + + var param3 = builder.AppendParameter(ids); param3.NpgsqlDbType = NpgsqlDbType.Array | NpgsqlDbType.Uuid; - var param4 = builder.AppendParameter(Stream.Events.Select(x => x.EventTypeName).ToArray()); + var param4 = builder.AppendParameter(typeNames); param4.NpgsqlDbType = NpgsqlDbType.Array | NpgsqlDbType.Varchar; - var param5 = builder.AppendParameter(Stream.Events.Select(x => x.DotNetTypeName).ToArray()); + var param5 = builder.AppendParameter(dotNetTypeNames); param5.NpgsqlDbType = NpgsqlDbType.Array | NpgsqlDbType.Varchar; - var param6 = builder.AppendParameter(Stream.Events.Select(e => session.Serializer.ToJson(e.Data)).ToArray()); + var param6 = builder.AppendParameter(jsonBodies); param6.NpgsqlDbType = NpgsqlDbType.Array | NpgsqlDbType.Jsonb; } protected void writeCausationIds(IGroupedParameterBuilder builder) { - var param = builder.AppendParameter(Stream.Events.Select(x => x.CausationId).ToArray()); + var events = Stream.Events; + var count = events.Count; + var causationIds = new string[count]; + for (int i = 0; i < count; i++) causationIds[i] = events[i].CausationId; + + var param = builder.AppendParameter(causationIds); param.NpgsqlDbType = NpgsqlDbType.Array | NpgsqlDbType.Varchar; } protected void writeCorrelationIds(IGroupedParameterBuilder builder) { - var param = builder.AppendParameter(Stream.Events.Select(x => x.CorrelationId).ToArray()); + var events = Stream.Events; + var count = events.Count; + var correlationIds = new string[count]; + for (int i = 0; i < count; i++) correlationIds[i] = events[i].CorrelationId; + + var param = builder.AppendParameter(correlationIds); param.NpgsqlDbType = NpgsqlDbType.Array | NpgsqlDbType.Varchar; } protected void writeHeaders(IGroupedParameterBuilder builder, IMartenSession session) { - var param = builder.AppendParameter(Stream.Events.Select(x => session.Serializer.ToJson(x.Headers)).ToArray()); + var events = Stream.Events; + var count = events.Count; + var headers = new string[count]; + for (int i = 0; i < count; i++) headers[i] = session.Serializer.ToJson(events[i].Headers); + + var param = builder.AppendParameter(headers); param.NpgsqlDbType = NpgsqlDbType.Array | NpgsqlDbType.Jsonb; } protected void writeUserNames(IGroupedParameterBuilder builder, IMartenSession session) { - var param = builder.AppendParameter(Stream.Events.Select(x => x.UserName).ToArray()); + var events = Stream.Events; + var count = events.Count; + var userNames = new string[count]; + for (int i = 0; i < count; i++) userNames[i] = events[i].UserName; + + var param = builder.AppendParameter(userNames); param.NpgsqlDbType = NpgsqlDbType.Array | NpgsqlDbType.Varchar; } protected void writeTimestamps(IGroupedParameterBuilder builder) { - var param = builder.AppendParameter(Stream.Events.Select(x => x.Timestamp).ToArray()); + var events = Stream.Events; + var count = events.Count; + var timestamps = new DateTimeOffset[count]; + for (int i = 0; i < count; i++) timestamps[i] = events[i].Timestamp; + + var param = builder.AppendParameter(timestamps); param.NpgsqlDbType = NpgsqlDbType.Array | NpgsqlDbType.TimestampTz; } @@ -140,9 +182,10 @@ public async Task PostprocessAsync(DbDataReader reader, IList excepti var values = await reader.GetFieldValueAsync(0, token).ConfigureAwait(false); var finalVersion = values[0]; - foreach (var e in Stream.Events.Reverse()) + var events = Stream.Events; + for (int i = events.Count - 1; i >= 0; i--) { - e.Version = finalVersion; + events[i].Version = finalVersion; finalVersion--; } @@ -150,10 +193,10 @@ public async Task PostprocessAsync(DbDataReader reader, IList excepti for (int i = 1; i < values.Length; i++) { // Only setting the sequence to aid in tombstone processing - Stream.Events[i - 1].Sequence = values[i]; + events[i - 1].Sequence = values[i]; } - if (Events is { UseMandatoryStreamTypeDeclaration: true } && Stream.Events[0].Version == 1) + if (Events is { UseMandatoryStreamTypeDeclaration: true } && events[0].Version == 1) { throw new NonExistentStreamException(Events.StreamIdentity == StreamIdentity.AsGuid ? Stream.Id diff --git a/src/Marten/Internal/DirtyTracking/ChangeTracker.cs b/src/Marten/Internal/DirtyTracking/ChangeTracker.cs index b585f8d4f5..caf0788e8d 100644 --- a/src/Marten/Internal/DirtyTracking/ChangeTracker.cs +++ b/src/Marten/Internal/DirtyTracking/ChangeTracker.cs @@ -1,3 +1,4 @@ +using System; using System.Diagnostics.CodeAnalysis; using Marten.Internal.Operations; using Newtonsoft.Json.Linq; @@ -20,6 +21,15 @@ public ChangeTracker(IMartenSession session, T document) public bool DetectChanges(IMartenSession session, [NotNullWhen(true)]out IStorageOperation? operation) { var newJson = session.Serializer.ToCleanJson(_document); + + // Fast path: if the JSON strings are identical, skip expensive parsing + if (string.Equals(_json, newJson, StringComparison.Ordinal)) + { + operation = null; + return false; + } + + // Slow path: parse and deep-compare to handle semantically equivalent JSON if (JToken.DeepEquals(JObject.Parse(_json), JObject.Parse(newJson))) { operation = null; diff --git a/src/Marten/Internal/Sessions/DirtyCheckingDocumentSession.cs b/src/Marten/Internal/Sessions/DirtyCheckingDocumentSession.cs index 24f123879e..78529ed263 100644 --- a/src/Marten/Internal/Sessions/DirtyCheckingDocumentSession.cs +++ b/src/Marten/Internal/Sessions/DirtyCheckingDocumentSession.cs @@ -1,5 +1,6 @@ #nullable enable using System; +using System.Collections.Generic; using System.Linq; using JasperFx.Core; using Marten.Internal.Operations; @@ -37,16 +38,18 @@ protected internal override void resetDirtyChecking() { foreach (var tracker in ChangeTrackers) tracker.Reset(this); - var knownDocuments = ChangeTrackers.Select(x => x.Document).ToArray(); + var knownDocuments = new HashSet( + ChangeTrackers.Select(x => x.Document), + ReferenceEqualityComparer.Instance); - var operations = _workTracker.AllOperations - .OfType() - .Where(x => !knownDocuments.Contains(x.Document)); - - foreach (var operation in operations) + var operations = _workTracker.AllOperations; + for (int i = 0; i < operations.Count; i++) { - var tracker = operation.ToTracker(this); - ChangeTrackers.Add(tracker); + if (operations[i] is IDocumentStorageOperation op && !knownDocuments.Contains(op.Document)) + { + var tracker = op.ToTracker(this); + ChangeTrackers.Add(tracker); + } } } diff --git a/src/Marten/Internal/Storage/DocumentStorage.cs b/src/Marten/Internal/Storage/DocumentStorage.cs index 115aa2d3a6..b1a86f43b8 100644 --- a/src/Marten/Internal/Storage/DocumentStorage.cs +++ b/src/Marten/Internal/Storage/DocumentStorage.cs @@ -45,6 +45,7 @@ public abstract class DocumentStorage: IDocumentStorage, IHaveMe private readonly string _selectClause; private readonly string[] _selectFields; private ISqlFragment? _defaultWhere; + private MetadataColumn[]? _metadataColumns; protected Action _setter; protected Action _setFromString = (_, _) => throw new NotSupportedException(); protected Action _setFromGuid = (_, _) => throw new NotSupportedException(); @@ -147,7 +148,7 @@ public ISelectClause SelectClauseWithDuplicatedFields MetadataColumn[] IHaveMetadataColumns.MetadataColumns() { - return _mapping.Schema.Table.Columns.OfType().ToArray(); + return _metadataColumns ??= _mapping.Schema.Table.Columns.OfType().ToArray(); } public IQueryableMemberCollection QueryMembers => _mapping.QueryMembers; diff --git a/src/Marten/Internal/UnitOfWork.cs b/src/Marten/Internal/UnitOfWork.cs index 9991f7621e..96614f1239 100644 --- a/src/Marten/Internal/UnitOfWork.cs +++ b/src/Marten/Internal/UnitOfWork.cs @@ -54,12 +54,26 @@ public void Add(IStorageOperation operation) } } - public IReadOnlyList AllOperations => _eventOperations.Concat(_operations).ToList(); + public IReadOnlyList AllOperations + { + get + { + if (_eventOperations.Count == 0) return _operations; + if (_operations.Count == 0) return _eventOperations; + + var combined = new List(_eventOperations.Count + _operations.Count); + combined.AddRange(_eventOperations); + combined.AddRange(_operations); + return combined; + } + } public void Sort(StoreOptions options) { if (shouldSort(options, out var comparer)) { + // Must use a stable sort to preserve insertion order for operations + // on the same document type (required for self-referencing foreign keys) var sorted = _operations.OrderBy(f => f, comparer).ToList(); _operations.Clear(); _operations.AddRange(sorted); diff --git a/src/Marten/Internal/UpdateBatch.cs b/src/Marten/Internal/UpdateBatch.cs index f92ec2ee24..d5c2c6e188 100644 --- a/src/Marten/Internal/UpdateBatch.cs +++ b/src/Marten/Internal/UpdateBatch.cs @@ -53,18 +53,21 @@ private IEnumerable buildPages(IMartenSession session) else { var count = 0; + var batchSize = session.Options.UpdateBatchSize; while (count < _operations.Count) { - var operations = _operations - .Skip(count) - .Take(session.Options.UpdateBatchSize) - .ToArray(); + var remaining = Math.Min(batchSize, _operations.Count - count); + var operations = new IStorageOperation[remaining]; + for (int i = 0; i < remaining; i++) + { + operations[i] = _operations[count + i]; + } var page = new OperationPage(session, operations); yield return page; - count += session.Options.UpdateBatchSize; + count += batchSize; } } } diff --git a/src/Marten/Linq/QueryHandlers/NamedParameterHelper.cs b/src/Marten/Linq/QueryHandlers/NamedParameterHelper.cs index 6409482c3c..598e2e398c 100644 --- a/src/Marten/Linq/QueryHandlers/NamedParameterHelper.cs +++ b/src/Marten/Linq/QueryHandlers/NamedParameterHelper.cs @@ -1,6 +1,8 @@ #nullable enable using System; +using System.Collections.Concurrent; using System.Collections.Generic; +using System.Reflection; using NpgsqlTypes; using Weasel.Postgresql; @@ -14,6 +16,8 @@ namespace Marten.Linq.QueryHandlers; /// internal static class NamedParameterHelper { + private static readonly ConcurrentDictionary _propertyCache = new(); + /// /// Appends the SQL to the builder, replacing :paramName references with positional /// parameters that have proper NpgsqlDbType set based on the property's declared type. @@ -21,8 +25,9 @@ internal static class NamedParameterHelper /// public static void AppendSqlWithNamedParameters(ICommandBuilder builder, string sql, object parameters) { - var properties = parameters.GetType().GetProperties(); - var propLookup = new Dictionary(StringComparer.OrdinalIgnoreCase); + var parameterType = parameters.GetType(); + var properties = _propertyCache.GetOrAdd(parameterType, static t => t.GetProperties()); + var propLookup = new Dictionary(properties.Length, StringComparer.OrdinalIgnoreCase); foreach (var property in properties) { propLookup[property.Name] = (property.GetValue(parameters), property.PropertyType);