diff --git a/src/Marten/Internal/ClosedShape/ClosedShapeDirtyTrackingSelector.cs b/src/Marten/Internal/ClosedShape/ClosedShapeDirtyTrackingSelector.cs index 173e1fb549..75b80db0c2 100644 --- a/src/Marten/Internal/ClosedShape/ClosedShapeDirtyTrackingSelector.cs +++ b/src/Marten/Internal/ClosedShape/ClosedShapeDirtyTrackingSelector.cs @@ -1,5 +1,4 @@ #nullable enable -using System; using System.Collections.Generic; using System.Data.Common; using System.Threading; @@ -12,29 +11,27 @@ namespace Marten.Internal.ClosedShape; /// -/// W3 spike (M2): for the -/// path. -/// Identity-map writes (like ) -/// plus a registered on the session for -/// every loaded document — gives SaveChangesAsync a baseline to -/// compare against when dirty-checking which loaded docs were modified. +/// Abstract base for the per- closed-shape +/// DirtyTracking . Identity-map writes (like +/// ) plus a +/// registered on the session per loaded +/// document. Sealed concurrency-specific subclasses override +/// CaptureVersion (#4659). /// -internal sealed class ClosedShapeDirtyTrackingSelector: ISelector, IDocumentSelector +internal abstract class ClosedShapeDirtyTrackingSelector: ISelector, IDocumentSelector where T : notnull where TId : notnull { - private const int IdColumn = 0; - private const int DataColumn = 1; - private const int FirstMetadataColumn = 2; + protected const int IdColumn = 0; + protected const int DataColumn = 1; + protected const int FirstMetadataColumn = 2; - private readonly IMartenSession _session; - private readonly ISerializer _serializer; - private readonly DocumentStorageDescriptor _descriptor; - private readonly Dictionary _identityMap; - private readonly Dictionary? _versions; - private readonly Dictionary? _revisions; + protected readonly IMartenSession _session; + protected readonly ISerializer _serializer; + protected readonly DocumentStorageDescriptor _descriptor; + protected readonly Dictionary _identityMap; - public ClosedShapeDirtyTrackingSelector(IMartenSession session, DocumentStorageDescriptor descriptor) + protected ClosedShapeDirtyTrackingSelector(IMartenSession session, DocumentStorageDescriptor descriptor) { _session = session; _serializer = session.Serializer; @@ -49,13 +46,6 @@ public ClosedShapeDirtyTrackingSelector(IMartenSession session, DocumentStorageD _identityMap = new Dictionary(); session.ItemMap[typeof(T)] = _identityMap; } - - _versions = descriptor.ConcurrencyMode == ConcurrencyMode.Optimistic - ? session.Versions.ForType() - : null; - _revisions = descriptor.ConcurrencyMode == ConcurrencyMode.Numeric - ? session.Versions.RevisionsFor() - : null; } public T Resolve(DbDataReader reader) @@ -96,7 +86,12 @@ public async Task ResolveAsync(DbDataReader reader, CancellationToken token) return doc; } - private T ReadDocument(DbDataReader reader) + /// + /// Concurrency-specific per-row version capture. + /// + protected abstract void CaptureVersion(DbDataReader reader, TId id); + + protected T ReadDocument(DbDataReader reader) { if (_descriptor.HierarchyMapping is { } hierarchy) { @@ -106,7 +101,7 @@ private T ReadDocument(DbDataReader reader) return _serializer.FromJson(reader, DataColumn); } - private async System.Threading.Tasks.ValueTask ReadDocumentAsync(DbDataReader reader, CancellationToken token) + protected async System.Threading.Tasks.ValueTask ReadDocumentAsync(DbDataReader reader, CancellationToken token) { if (_descriptor.HierarchyMapping is { } hierarchy) { @@ -116,7 +111,7 @@ private async System.Threading.Tasks.ValueTask ReadDocumentAsync(DbDataReader return await _serializer.FromJsonAsync(reader, DataColumn, token).ConfigureAwait(false); } - private void ApplyMetadata(DbDataReader reader, T document) + protected void ApplyMetadata(DbDataReader reader, T document) { var ordinal = FirstMetadataColumn; foreach (var binder in _descriptor.ReadBinders) @@ -125,21 +120,4 @@ private void ApplyMetadata(DbDataReader reader, T document) ordinal++; } } - - private void CaptureVersion(DbDataReader reader, TId id) - { - var versionIndex = _descriptor.VersionReadIndex; - if (versionIndex < 0) return; - var versionOrdinal = FirstMetadataColumn + versionIndex; - if (reader.IsDBNull(versionOrdinal)) return; - - if (_versions is not null) - { - _versions[id] = reader.GetFieldValue(versionOrdinal); - } - else if (_revisions is not null) - { - _revisions[id] = reader.GetFieldValue(versionOrdinal); - } - } } diff --git a/src/Marten/Internal/ClosedShape/ClosedShapeIdentityMapSelector.cs b/src/Marten/Internal/ClosedShape/ClosedShapeIdentityMapSelector.cs index 244755d887..321f68f682 100644 --- a/src/Marten/Internal/ClosedShape/ClosedShapeIdentityMapSelector.cs +++ b/src/Marten/Internal/ClosedShape/ClosedShapeIdentityMapSelector.cs @@ -1,5 +1,4 @@ #nullable enable -using System; using System.Collections.Generic; using System.Data.Common; using System.Threading; @@ -11,37 +10,27 @@ namespace Marten.Internal.ClosedShape; /// -/// W3 spike (M2): for the -/// path. Reads -/// id+data+metadata and writes (id, doc) into the session's -/// identity-map dictionary so subsequent LoadAsync calls -/// short-circuit to the in-memory instance. +/// Abstract base for the per- closed-shape +/// IdentityMap . Owns the identity-map cache +/// init + lookup; sealed subclasses provide a monomorphic +/// CaptureVersion override so the per-row hot path doesn't read +/// ConcurrencyMode (#4659). /// -/// -/// The identity map lives on keyed by -/// document type. The selector acquires it (or creates a fresh one) at -/// construction so per-row writes don't re-walk the dictionary lookup. -/// Mirrors what the codegen-emitted DocumentSelectorWithIdentityMap -/// subclass does today. -/// -internal sealed class ClosedShapeIdentityMapSelector: ISelector, IDocumentSelector +internal abstract class ClosedShapeIdentityMapSelector: ISelector, IDocumentSelector where T : notnull where TId : notnull { // Same column layout as Lightweight: id at 0, data at 1, metadata at 2+. - // IdColumn.ShouldSelect is true for all non-QueryOnly styles. - private const int IdColumn = 0; - private const int DataColumn = 1; - private const int FirstMetadataColumn = 2; + protected const int IdColumn = 0; + protected const int DataColumn = 1; + protected const int FirstMetadataColumn = 2; - private readonly IMartenSession _session; - private readonly ISerializer _serializer; - private readonly DocumentStorageDescriptor _descriptor; - private readonly Dictionary _identityMap; - private readonly Dictionary? _versions; - private readonly Dictionary? _revisions; + protected readonly IMartenSession _session; + protected readonly ISerializer _serializer; + protected readonly DocumentStorageDescriptor _descriptor; + protected readonly Dictionary _identityMap; - public ClosedShapeIdentityMapSelector(IMartenSession session, DocumentStorageDescriptor descriptor) + protected ClosedShapeIdentityMapSelector(IMartenSession session, DocumentStorageDescriptor descriptor) { _session = session; _serializer = session.Serializer; @@ -56,13 +45,6 @@ public ClosedShapeIdentityMapSelector(IMartenSession session, DocumentStorageDes _identityMap = new Dictionary(); session.ItemMap[typeof(T)] = _identityMap; } - - _versions = descriptor.ConcurrencyMode == ConcurrencyMode.Optimistic - ? session.Versions.ForType() - : null; - _revisions = descriptor.ConcurrencyMode == ConcurrencyMode.Numeric - ? session.Versions.RevisionsFor() - : null; } public T Resolve(DbDataReader reader) @@ -106,7 +88,13 @@ public async Task ResolveAsync(DbDataReader reader, CancellationToken token) return doc; } - private T ReadDocument(DbDataReader reader) + /// + /// Concurrency-specific per-row version capture. Off-mode subclasses + /// no-op; Optimistic / Numeric capture into their typed tracker. + /// + protected abstract void CaptureVersion(DbDataReader reader, TId id); + + protected T ReadDocument(DbDataReader reader) { if (_descriptor.HierarchyMapping is { } hierarchy) { @@ -116,7 +104,7 @@ private T ReadDocument(DbDataReader reader) return _serializer.FromJson(reader, DataColumn); } - private async System.Threading.Tasks.ValueTask ReadDocumentAsync(DbDataReader reader, CancellationToken token) + protected async System.Threading.Tasks.ValueTask ReadDocumentAsync(DbDataReader reader, CancellationToken token) { if (_descriptor.HierarchyMapping is { } hierarchy) { @@ -126,7 +114,7 @@ private async System.Threading.Tasks.ValueTask ReadDocumentAsync(DbDataReader return await _serializer.FromJsonAsync(reader, DataColumn, token).ConfigureAwait(false); } - private void ApplyMetadata(DbDataReader reader, T document) + protected void ApplyMetadata(DbDataReader reader, T document) { var ordinal = FirstMetadataColumn; foreach (var binder in _descriptor.ReadBinders) @@ -135,21 +123,4 @@ private void ApplyMetadata(DbDataReader reader, T document) ordinal++; } } - - private void CaptureVersion(DbDataReader reader, TId id) - { - var versionIndex = _descriptor.VersionReadIndex; - if (versionIndex < 0) return; - var versionOrdinal = FirstMetadataColumn + versionIndex; - if (reader.IsDBNull(versionOrdinal)) return; - - if (_versions is not null) - { - _versions[id] = reader.GetFieldValue(versionOrdinal); - } - else if (_revisions is not null) - { - _revisions[id] = reader.GetFieldValue(versionOrdinal); - } - } } diff --git a/src/Marten/Internal/ClosedShape/ClosedShapeInsertOperation.cs b/src/Marten/Internal/ClosedShape/ClosedShapeInsertOperation.cs index 58574c2365..e93d5f3f5f 100644 --- a/src/Marten/Internal/ClosedShape/ClosedShapeInsertOperation.cs +++ b/src/Marten/Internal/ClosedShape/ClosedShapeInsertOperation.cs @@ -4,10 +4,6 @@ using System.Data.Common; using System.Threading; using System.Threading.Tasks; -using JasperFx; -using JasperFx.Core; -using Marten.Exceptions; -using Marten.Internal; using Marten.Internal.Operations; using Npgsql; using NpgsqlTypes; @@ -17,56 +13,51 @@ namespace Marten.Internal.ClosedShape; /// -/// W3 spike (M3+M7+M8): hand-written Insert operation. Emits -/// INSERT … ON CONFLICT (id) DO NOTHING RETURNING {id|mt_version}. -/// Parameter ordering matches : -/// id, data, then each client-side metadata binder. RETURNING lets -/// distinguish "row inserted" from "row already -/// existed" and raise in the -/// latter case. +/// Abstract base for the per- closed-shape +/// Insert operation. Holds the shared infrastructure (document, identity, +/// tenant, descriptor, interface boilerplate) but pushes the parameter- +/// binding and postprocess decisions onto sealed concurrency-specific +/// subclasses so the hot path doesn't read ConcurrencyMode at +/// runtime (#4659). /// /// -/// Under the operation generates -/// the new Guid version client-side at construction time; under -/// it binds the caller-supplied -/// (or default 0 = auto-increment to 1) Revision. -/// Either way the new value is written back onto the document + -/// session.Versions in postprocess. +/// +/// Subclasses: +/// +/// — +/// ConcurrencyMode.Off; no version/revision binders to special-case. +/// — +/// ConcurrencyMode.Optimistic; binds + tracks a Guid version +/// per row. +/// — +/// ConcurrencyMode.Numeric; binds a (2–4) revision-CASE block +/// and tracks the returned revision. +/// +/// /// -internal sealed class ClosedShapeInsertOperation: IDocumentStorageOperation, IRevisionedOperation, IIdentifiedOperation, JasperFx.Core.Exceptions.IExceptionTransform +internal abstract class ClosedShapeInsertOperation: IDocumentStorageOperation, IRevisionedOperation, IIdentifiedOperation, JasperFx.Core.Exceptions.IExceptionTransform where TDoc : notnull where TId : notnull { - public TId Id => _id; - - private readonly TDoc _document; - private readonly TId _id; - private readonly string _tenantId; - private readonly DocumentStorageDescriptor _descriptor; - private readonly Dictionary? _versions; - private readonly Dictionary? _revisions; - private readonly Guid _newVersion; + protected readonly TDoc _document; + protected readonly TId _id; + protected readonly string _tenantId; + protected readonly DocumentStorageDescriptor _descriptor; - public ClosedShapeInsertOperation( + protected ClosedShapeInsertOperation( TDoc document, TId id, string tenantId, - DocumentStorageDescriptor descriptor, - Dictionary? versions, - Dictionary? revisions) + DocumentStorageDescriptor descriptor) { _document = document; _id = id; _tenantId = tenantId; _descriptor = descriptor; - _versions = versions; - _revisions = revisions; - if (descriptor.ConcurrencyMode == ConcurrencyMode.Optimistic) - { - _newVersion = CombGuidIdGeneration.NewGuid(); - } } + public TId Id => _id; + public long Revision { get; set; } public bool IgnoreConcurrencyViolation { get; set; } @@ -80,15 +71,22 @@ public Marten.Internal.DirtyTracking.IChangeTracker ToTracker(IMartenSession ses public OperationRole Role() => OperationRole.Insert; - public void ConfigureCommand(ICommandBuilder builder, IMartenSession session) + public abstract void ConfigureCommand(ICommandBuilder builder, IMartenSession session); + + public abstract Task PostprocessAsync(DbDataReader reader, IList exceptions, CancellationToken token); + + /// + /// Bind the leading [tenant_id, ] id, data parameter triple and + /// project session-derived metadata onto the document before + /// serialization. Returns the next free parameter slot. + /// + /// + /// Mirrors the codegen path's GenerateCodeToModifyDocument frames: + /// Correlation / Causation / Headers / LastModifiedBy etc. land on + /// the document so they flow into the JSON data column too. + /// + protected int BindLeadingParameters(NpgsqlParameter[] parameters, IMartenSession session) { - // Parameter ordering matches the descriptor's SQL: - // non-conjoined: id (0), data (1), client-side binders (2+) - // conjoined: tenant_id (0), id (1), data (2), binders (3+) - // Under Numeric mode, the revision binder consumes TWO ? slots - // (the CASE WHEN ? = 0 THEN 1 ELSE ? END expression). - var parameters = builder.AppendWithParameters(_descriptor.InsertSql, '?'); - var slot = 0; if (_descriptor.IsConjoined) { @@ -101,10 +99,6 @@ public void ConfigureCommand(ICommandBuilder builder, IMartenSession session) parameters[slot].NpgsqlDbType = PostgresqlProvider.Instance.ToParameterType(_descriptor.Identification.RawSqlType); slot++; - // Project session-derived metadata (Correlation/Causation/ - // Headers/LastModifiedBy) onto the document BEFORE serialization - // so the values flow into the JSON data column too. Mirrors the - // codegen path's GenerateCodeToModifyDocument frames. foreach (var binder in _descriptor.WriteBinders) { binder.ApplyToDocument(_document, session); @@ -113,90 +107,31 @@ public void ConfigureCommand(ICommandBuilder builder, IMartenSession session) session.Serializer.WriteToParameter(parameters[slot], _document); slot++; - foreach (var binder in _descriptor.ClientSideWriteBinders) - { - slot = BindBinder(parameters, slot, binder, session); - } - } - - public async Task PostprocessAsync(DbDataReader reader, IList exceptions, CancellationToken token) - { - if (!await reader.ReadAsync(token).ConfigureAwait(false)) - { - exceptions.Add(new DocumentAlreadyExistsException(null, typeof(TDoc), _id)); - return; - } - - ApplyConcurrencyResult(reader); + return slot; } - private int BindBinder(NpgsqlParameter[] parameters, int slot, IDocumentMetadataBinder binder, IMartenSession session) + /// + /// Bind the optional id [, tenant_id] subquery slots that + /// UseVersionFromMatchingStream emits inside the revision + /// CASE expression. Returns the next free slot. Common to the + /// Numeric variants of Insert / Upsert / Overwrite. + /// + protected int BindUseVersionFromMatchingStreamSubquery(NpgsqlParameter[] parameters, int slot) { - if (_descriptor.ConcurrencyMode == ConcurrencyMode.Optimistic && - ReferenceEquals(binder, _descriptor.VersionBinder)) - { - parameters[slot].Value = _newVersion; - parameters[slot].NpgsqlDbType = NpgsqlDbType.Uuid; - _descriptor.VersionBinder.ApplyVersionTo(_document, _newVersion); - return slot + 1; - } + parameters[slot].Value = _descriptor.Identification.ToRawSqlValue(_id); + parameters[slot].NpgsqlDbType = PostgresqlProvider.Instance.ToParameterType(_descriptor.Identification.RawSqlType); + slot++; - if (_descriptor.ConcurrencyMode == ConcurrencyMode.Numeric && - ReferenceEquals(binder, _descriptor.RevisionBinder)) + if (_descriptor.IsConjoined) { - // Numeric path. Two layouts depending on UseVersionFromMatchingStream: - // Default: CASE WHEN ? = 0 THEN 1 ELSE ? END (2 slots) - // UseVersionFromMatchingStream (non-conjoined): CASE WHEN ? = 0 - // THEN COALESCE((select version from .mt_streams where id = ?), 1) - // ELSE ? END (3 slots: ?=0 check, subquery id, explicit revision) - // UseVersionFromMatchingStream + conjoined: same with extra ? for tenant_id (4 slots) - // #4614: parameter type follows the column width (integer vs bigint). - var revisionDbType = _descriptor.RevisionBinder.ColumnDbType; - var revisionValue = revisionDbType == NpgsqlDbType.Integer - ? (object)checked((int)Revision) - : Revision; - parameters[slot].Value = revisionValue; - parameters[slot].NpgsqlDbType = revisionDbType; + parameters[slot].Value = _tenantId; + parameters[slot].NpgsqlDbType = NpgsqlDbType.Varchar; slot++; - - if (_descriptor.UseVersionFromMatchingStream) - { - parameters[slot].Value = _descriptor.Identification.ToRawSqlValue(_id); - parameters[slot].NpgsqlDbType = PostgresqlProvider.Instance.ToParameterType(_descriptor.Identification.RawSqlType); - slot++; - - if (_descriptor.IsConjoined) - { - parameters[slot].Value = _tenantId; - parameters[slot].NpgsqlDbType = NpgsqlDbType.Varchar; - slot++; - } - } - - parameters[slot].Value = revisionValue; - parameters[slot].NpgsqlDbType = revisionDbType; - return slot + 1; } - binder.BindParameter(parameters[slot], _document, session); - return slot + 1; + return slot; } - private void ApplyConcurrencyResult(DbDataReader reader) - { - switch (_descriptor.ConcurrencyMode) - { - case ConcurrencyMode.Optimistic: - _versions![_id] = _newVersion; - break; - case ConcurrencyMode.Numeric: - var newRevision = reader.GetFieldValue(0); - _revisions![_id] = newRevision; - _descriptor.RevisionBinder?.ApplyRevisionTo(_document, newRevision); - break; - } - } public bool TryTransform(System.Exception original, out System.Exception? transformed) => ClosedShapeOperationExceptionTransform.TryTransform(original, _descriptor.TableName, typeof(TDoc), _id!, out transformed); - } diff --git a/src/Marten/Internal/ClosedShape/ClosedShapeLightweightSelector.cs b/src/Marten/Internal/ClosedShape/ClosedShapeLightweightSelector.cs index 087c2684b1..2811b24c61 100644 --- a/src/Marten/Internal/ClosedShape/ClosedShapeLightweightSelector.cs +++ b/src/Marten/Internal/ClosedShape/ClosedShapeLightweightSelector.cs @@ -1,6 +1,4 @@ #nullable enable -using System; -using System.Collections.Generic; using System.Data.Common; using System.Threading; using System.Threading.Tasks; @@ -11,50 +9,42 @@ namespace Marten.Internal.ClosedShape; /// -/// W3 spike (M1+M2+M7): for the lightweight -/// closed-shape storage path. Reads the data column at index 1 and -/// dispatches each .Apply at -/// the binder's column position (2, 3, …). Lightweight skips -/// identity-map writes — every LoadAsync hits the database. -/// Under the selector also -/// captures each row's mt_version into session.Versions -/// so subsequent updates can supply it as the expected version. +/// Abstract base for the per- closed-shape +/// Lightweight . Owns the shared row-shape +/// (id at col 0, data at col 1, metadata at 2+) plus document +/// deserialization and metadata-apply. The per-row +/// CaptureVersion step is virtual; sealed subclasses provide a +/// monomorphic implementation so the JIT can devirtualize the hot +/// path (#4659). /// -internal sealed class ClosedShapeLightweightSelector: ISelector, IDocumentSelector +internal abstract class ClosedShapeLightweightSelector: ISelector, IDocumentSelector where T : notnull where TId : notnull { // Lightweight column order from DocumentTable.SelectColumns: // id (col 0), data (col 1), then ShouldSelect metadata columns. - private const int IdColumn = 0; - private const int DataColumn = 1; - private const int FirstMetadataColumn = 2; + protected const int IdColumn = 0; + protected const int DataColumn = 1; + protected const int FirstMetadataColumn = 2; - private readonly IMartenSession _session; - private readonly ISerializer _serializer; - private readonly DocumentStorageDescriptor _descriptor; - private readonly Dictionary? _versions; - private readonly Dictionary? _revisions; + protected readonly IMartenSession _session; + protected readonly ISerializer _serializer; + protected readonly DocumentStorageDescriptor _descriptor; - public ClosedShapeLightweightSelector(IMartenSession session, DocumentStorageDescriptor descriptor) + protected ClosedShapeLightweightSelector(IMartenSession session, DocumentStorageDescriptor descriptor) { _session = session; _serializer = session.Serializer; _descriptor = descriptor; - _versions = descriptor.ConcurrencyMode == ConcurrencyMode.Optimistic - ? session.Versions.ForType() - : null; - _revisions = descriptor.ConcurrencyMode == ConcurrencyMode.Numeric - ? session.Versions.RevisionsFor() - : null; } public T Resolve(DbDataReader reader) { var doc = ReadDocument(reader); ApplyMetadata(reader, doc); - CaptureVersion(reader); - _session.MarkAsDocumentLoaded(_descriptor.Identification.ReadIdFromReader(reader, IdColumn), doc); + var id = _descriptor.Identification.ReadIdFromReader(reader, IdColumn); + CaptureVersion(reader, id); + _session.MarkAsDocumentLoaded(id, doc); return doc; } @@ -62,12 +52,20 @@ public async Task ResolveAsync(DbDataReader reader, CancellationToken token) { var doc = await ReadDocumentAsync(reader, token).ConfigureAwait(false); ApplyMetadata(reader, doc); - CaptureVersion(reader); - _session.MarkAsDocumentLoaded(_descriptor.Identification.ReadIdFromReader(reader, IdColumn), doc); + var id = _descriptor.Identification.ReadIdFromReader(reader, IdColumn); + CaptureVersion(reader, id); + _session.MarkAsDocumentLoaded(id, doc); return doc; } - private T ReadDocument(DbDataReader reader) + /// + /// Concurrency-specific per-row version capture. Off-mode subclasses + /// no-op; Optimistic captures the Guid into the per-type version + /// dict; Numeric captures the long into the per-type revision dict. + /// + protected abstract void CaptureVersion(DbDataReader reader, TId id); + + protected T ReadDocument(DbDataReader reader) { if (_descriptor.HierarchyMapping is { } hierarchy) { @@ -77,7 +75,7 @@ private T ReadDocument(DbDataReader reader) return _serializer.FromJson(reader, DataColumn); } - private async System.Threading.Tasks.ValueTask ReadDocumentAsync(DbDataReader reader, CancellationToken token) + protected async System.Threading.Tasks.ValueTask ReadDocumentAsync(DbDataReader reader, CancellationToken token) { if (_descriptor.HierarchyMapping is { } hierarchy) { @@ -87,7 +85,7 @@ private async System.Threading.Tasks.ValueTask ReadDocumentAsync(DbDataReader return await _serializer.FromJsonAsync(reader, DataColumn, token).ConfigureAwait(false); } - private void ApplyMetadata(DbDataReader reader, T document) + protected void ApplyMetadata(DbDataReader reader, T document) { var ordinal = FirstMetadataColumn; foreach (var binder in _descriptor.ReadBinders) @@ -96,23 +94,4 @@ private void ApplyMetadata(DbDataReader reader, T document) ordinal++; } } - - private void CaptureVersion(DbDataReader reader) - { - var versionIndex = _descriptor.VersionReadIndex; - if (versionIndex < 0) return; - var versionOrdinal = FirstMetadataColumn + versionIndex; - if (reader.IsDBNull(versionOrdinal)) return; - - if (_versions is not null) - { - var id = _descriptor.Identification.ReadIdFromReader(reader, IdColumn); - _versions[id] = reader.GetFieldValue(versionOrdinal); - } - else if (_revisions is not null) - { - var id = _descriptor.Identification.ReadIdFromReader(reader, IdColumn); - _revisions[id] = reader.GetFieldValue(versionOrdinal); - } - } } diff --git a/src/Marten/Internal/ClosedShape/ClosedShapeOverwriteOperation.cs b/src/Marten/Internal/ClosedShape/ClosedShapeOverwriteOperation.cs index 0e1bc7a86d..c26ed8f5db 100644 --- a/src/Marten/Internal/ClosedShape/ClosedShapeOverwriteOperation.cs +++ b/src/Marten/Internal/ClosedShape/ClosedShapeOverwriteOperation.cs @@ -4,8 +4,6 @@ using System.Data.Common; using System.Threading; using System.Threading.Tasks; -using JasperFx.Core; -using Marten.Internal; using Marten.Internal.Operations; using Npgsql; using NpgsqlTypes; @@ -15,49 +13,39 @@ namespace Marten.Internal.ClosedShape; /// -/// W3 spike (M7+M8): hand-written Overwrite operation. Same shape as -/// except the -/// trailing concurrency guard is dropped — the caller has explicitly -/// asked to bypass the optimistic / numeric check +/// Abstract base for the per- closed-shape +/// Overwrite operation. Same shape as Upsert but the trailing +/// concurrency guard is dropped — the caller has explicitly opted to +/// bypass the optimistic / numeric check /// (session.Store(doc, ignoreConcurrencyCheck: true) or -/// session -/// option). Under overwrite is -/// functionally identical to upsert. +/// ). Sealed +/// subclasses provide the concrete + +/// bodies so the hot path doesn't branch +/// on ConcurrencyMode (#4659). /// -internal sealed class ClosedShapeOverwriteOperation: IDocumentStorageOperation, IRevisionedOperation, IIdentifiedOperation, JasperFx.Core.Exceptions.IExceptionTransform +internal abstract class ClosedShapeOverwriteOperation: IDocumentStorageOperation, IRevisionedOperation, IIdentifiedOperation, JasperFx.Core.Exceptions.IExceptionTransform where TDoc : notnull where TId : notnull { - public TId Id => _id; - - private readonly TDoc _document; - private readonly TId _id; - private readonly string _tenantId; - private readonly DocumentStorageDescriptor _descriptor; - private readonly Dictionary? _versions; - private readonly Dictionary? _revisions; - private readonly Guid _newVersion; + protected readonly TDoc _document; + protected readonly TId _id; + protected readonly string _tenantId; + protected readonly DocumentStorageDescriptor _descriptor; - public ClosedShapeOverwriteOperation( + protected ClosedShapeOverwriteOperation( TDoc document, TId id, string tenantId, - DocumentStorageDescriptor descriptor, - Dictionary? versions, - Dictionary? revisions) + DocumentStorageDescriptor descriptor) { _document = document; _id = id; _tenantId = tenantId; _descriptor = descriptor; - _versions = versions; - _revisions = revisions; - if (descriptor.ConcurrencyMode == ConcurrencyMode.Optimistic) - { - _newVersion = CombGuidIdGeneration.NewGuid(); - } } + public TId Id => _id; + public long Revision { get; set; } public bool IgnoreConcurrencyViolation { get; set; } @@ -71,10 +59,17 @@ public Marten.Internal.DirtyTracking.IChangeTracker ToTracker(IMartenSession ses public OperationRole Role() => OperationRole.Update; - public void ConfigureCommand(ICommandBuilder builder, IMartenSession session) - { - var parameters = builder.AppendWithParameters(_descriptor.OverwriteSql, '?'); + public abstract void ConfigureCommand(ICommandBuilder builder, IMartenSession session); + + public abstract Task PostprocessAsync(DbDataReader reader, IList exceptions, CancellationToken token); + /// + /// Bind [tenant_id,] id, data + the client-side write binders + /// up to (not including) the trailing ON CONFLICT SET concurrency + /// slots. Returns the next free parameter slot. + /// + protected int BindPreOnConflictParameters(NpgsqlParameter[] parameters, IMartenSession session) + { var slot = 0; if (_descriptor.IsConjoined) { @@ -87,8 +82,6 @@ public void ConfigureCommand(ICommandBuilder builder, IMartenSession session) parameters[slot].NpgsqlDbType = PostgresqlProvider.Instance.ToParameterType(_descriptor.Identification.RawSqlType); slot++; - // Project session-derived metadata onto the document BEFORE - // serialization so the values flow into the JSON data column. foreach (var binder in _descriptor.WriteBinders) { binder.ApplyToDocument(_document, session); @@ -99,102 +92,39 @@ public void ConfigureCommand(ICommandBuilder builder, IMartenSession session) foreach (var binder in _descriptor.ClientSideWriteBinders) { - slot = BindBinder(parameters, slot, binder, session); - } - - if (_descriptor.ConcurrencyMode == ConcurrencyMode.Numeric) - { - // DO UPDATE SET mt_version = CASE WHEN ? = 0 THEN current+1 ELSE ? END - // No WHERE guard — Overwrite always wins. - parameters[slot].Value = Revision; - parameters[slot].NpgsqlDbType = NpgsqlDbType.Bigint; - parameters[slot + 1].Value = Revision; - parameters[slot + 1].NpgsqlDbType = NpgsqlDbType.Bigint; + slot = BindClientSideBinder(parameters, slot, binder, session); } - } - - public async Task PostprocessAsync(DbDataReader reader, IList exceptions, CancellationToken token) - { - if (_descriptor.ConcurrencyMode == ConcurrencyMode.Off) return; - if (await reader.ReadAsync(token).ConfigureAwait(false)) - { - ApplyConcurrencyResult(reader); - } + return slot; } - private int BindBinder(NpgsqlParameter[] parameters, int slot, IDocumentMetadataBinder binder, IMartenSession session) + /// + /// Bind the optional id [, tenant_id] subquery slots that + /// UseVersionFromMatchingStream emits inside the revision CASE + /// expression. + /// + protected int BindUseVersionFromMatchingStreamSubquery(NpgsqlParameter[] parameters, int slot) { - if (_descriptor.ConcurrencyMode == ConcurrencyMode.Optimistic && - ReferenceEquals(binder, _descriptor.VersionBinder)) - { - parameters[slot].Value = _newVersion; - parameters[slot].NpgsqlDbType = NpgsqlDbType.Uuid; - _descriptor.VersionBinder.ApplyVersionTo(_document, _newVersion); - return slot + 1; - } + parameters[slot].Value = _descriptor.Identification.ToRawSqlValue(_id); + parameters[slot].NpgsqlDbType = PostgresqlProvider.Instance.ToParameterType(_descriptor.Identification.RawSqlType); + slot++; - if (_descriptor.ConcurrencyMode == ConcurrencyMode.Numeric && - ReferenceEquals(binder, _descriptor.RevisionBinder)) + if (_descriptor.IsConjoined) { - // INSERT VALUES side. Two layouts depending on UseVersionFromMatchingStream: - // Default: CASE WHEN ? = 0 THEN 1 ELSE ? END (2 slots) - // UseVersionFromMatchingStream (non-conjoined): CASE WHEN ? = 0 - // THEN COALESCE((select version from .mt_streams where id = ?), 1) - // ELSE ? END (3 slots) - // UseVersionFromMatchingStream + conjoined: add an extra ? for tenant_id (4 slots) - parameters[slot].Value = Revision; - parameters[slot].NpgsqlDbType = NpgsqlDbType.Bigint; + parameters[slot].Value = _tenantId; + parameters[slot].NpgsqlDbType = NpgsqlDbType.Varchar; slot++; - - if (_descriptor.UseVersionFromMatchingStream) - { - parameters[slot].Value = _descriptor.Identification.ToRawSqlValue(_id); - parameters[slot].NpgsqlDbType = PostgresqlProvider.Instance.ToParameterType(_descriptor.Identification.RawSqlType); - slot++; - - if (_descriptor.IsConjoined) - { - parameters[slot].Value = _tenantId; - parameters[slot].NpgsqlDbType = NpgsqlDbType.Varchar; - slot++; - } - } - - parameters[slot].Value = Revision; - parameters[slot].NpgsqlDbType = NpgsqlDbType.Bigint; - return slot + 1; } - binder.BindParameter(parameters[slot], _document, session); - return slot + 1; + return slot; } - private void ApplyConcurrencyResult(DbDataReader reader) - { - switch (_descriptor.ConcurrencyMode) - { - case ConcurrencyMode.Optimistic: - // Trackers are null when built via DocumentStorage.OverwriteProjected - // (async-daemon projection path) — see issue #4657. - if (_versions is not null) - { - _versions[_id] = _newVersion; - } - break; - case ConcurrencyMode.Numeric: - // Must still consume the RETURNING row so the OperationPage cursor - // stays aligned; only the tracker write is conditional. - var newRevision = reader.GetFieldValue(0); - if (_revisions is not null) - { - _revisions[_id] = newRevision; - } - _descriptor.RevisionBinder?.ApplyRevisionTo(_document, newRevision); - break; - } - } + /// + /// Concurrency-aware subclasses override to special-case the + /// VersionBinder / RevisionBinder. + /// + protected abstract int BindClientSideBinder(NpgsqlParameter[] parameters, int slot, IDocumentMetadataBinder binder, IMartenSession session); + public bool TryTransform(System.Exception original, out System.Exception? transformed) => ClosedShapeOperationExceptionTransform.TryTransform(original, _descriptor.TableName, typeof(TDoc), _id!, out transformed); - } diff --git a/src/Marten/Internal/ClosedShape/ClosedShapeRegistration.cs b/src/Marten/Internal/ClosedShape/ClosedShapeRegistration.cs index 64f4961465..28bb878342 100644 --- a/src/Marten/Internal/ClosedShape/ClosedShapeRegistration.cs +++ b/src/Marten/Internal/ClosedShape/ClosedShapeRegistration.cs @@ -177,9 +177,14 @@ private static DocumentProvider BuildProvider( var descriptor = DocumentStorageDescriptorBuilder.Build(mapping, identification); var queryOnly = new QueryOnlyClosedShapeStorage(mapping, descriptor); - var lightweight = new LightweightClosedShapeStorage(mapping, descriptor); - var identityMap = new IdentityMapClosedShapeStorage(mapping, descriptor); - var dirtyTracking = new DirtyCheckedClosedShapeStorage(mapping, descriptor); + + // #4659: dispatch on ConcurrencyMode ONCE at registration time so the + // three writeable storage styles are monomorphic-by-construction + // (each leaf knows its concurrency mode at type identity, no per- + // operation / per-row reads of _descriptor.ConcurrencyMode left). + var lightweight = BuildLightweightStorage(mapping, descriptor); + var identityMap = BuildIdentityMapStorage(mapping, descriptor); + var dirtyTracking = BuildDirtyCheckedStorage(mapping, descriptor); // M16: real bulk loader, COPY-based, built from the descriptor's // column list. Lightweight is fine to use as the storage backing @@ -193,4 +198,37 @@ private static DocumentProvider BuildProvider( identityMap: identityMap, dirtyTracking: dirtyTracking); } + + private static LightweightClosedShapeStorage BuildLightweightStorage( + DocumentMapping mapping, DocumentStorageDescriptor descriptor) + where TDoc : notnull + where TId : notnull + => descriptor.ConcurrencyMode switch + { + ConcurrencyMode.Optimistic => new OptimisticLightweightClosedShapeStorage(mapping, descriptor), + ConcurrencyMode.Numeric => new NumericLightweightClosedShapeStorage(mapping, descriptor), + _ => new UnversionedLightweightClosedShapeStorage(mapping, descriptor), + }; + + private static IdentityMapClosedShapeStorage BuildIdentityMapStorage( + DocumentMapping mapping, DocumentStorageDescriptor descriptor) + where TDoc : notnull + where TId : notnull + => descriptor.ConcurrencyMode switch + { + ConcurrencyMode.Optimistic => new OptimisticIdentityMapClosedShapeStorage(mapping, descriptor), + ConcurrencyMode.Numeric => new NumericIdentityMapClosedShapeStorage(mapping, descriptor), + _ => new UnversionedIdentityMapClosedShapeStorage(mapping, descriptor), + }; + + private static DirtyCheckedClosedShapeStorage BuildDirtyCheckedStorage( + DocumentMapping mapping, DocumentStorageDescriptor descriptor) + where TDoc : notnull + where TId : notnull + => descriptor.ConcurrencyMode switch + { + ConcurrencyMode.Optimistic => new OptimisticDirtyCheckedClosedShapeStorage(mapping, descriptor), + ConcurrencyMode.Numeric => new NumericDirtyCheckedClosedShapeStorage(mapping, descriptor), + _ => new UnversionedDirtyCheckedClosedShapeStorage(mapping, descriptor), + }; } diff --git a/src/Marten/Internal/ClosedShape/ClosedShapeUpdateOperation.cs b/src/Marten/Internal/ClosedShape/ClosedShapeUpdateOperation.cs index 421be4db41..1a6d575b3a 100644 --- a/src/Marten/Internal/ClosedShape/ClosedShapeUpdateOperation.cs +++ b/src/Marten/Internal/ClosedShape/ClosedShapeUpdateOperation.cs @@ -4,10 +4,6 @@ using System.Data.Common; using System.Threading; using System.Threading.Tasks; -using JasperFx; -using JasperFx.Core; -using Marten.Exceptions; -using Marten.Internal; using Marten.Internal.Operations; using Npgsql; using NpgsqlTypes; @@ -17,52 +13,46 @@ namespace Marten.Internal.ClosedShape; /// -/// W3 spike (M3+M7+M8): hand-written Update operation. Emits -/// UPDATE … SET data = ?, … WHERE id = ? [and tenant_id = ?] [and concurrency-guard] RETURNING {id|mt_version}. +/// Abstract base for the per- closed-shape +/// Update operation. Sealed subclasses provide the concrete +/// + bodies +/// so the hot path doesn't branch on ConcurrencyMode (#4659). /// /// -/// Postprocess branches on : +/// Subclasses: /// -/// Off: a missing row raises . -/// Optimistic: a missing row raises . -/// Numeric: a missing row raises unless -/// is set. +/// — +/// missing row → . +/// — +/// missing row → +/// (unless ). +/// — +/// same as Optimistic but with a revision CASE/WHERE block. /// /// -internal sealed class ClosedShapeUpdateOperation: IDocumentStorageOperation, IRevisionedOperation, IIdentifiedOperation, JasperFx.Core.Exceptions.IExceptionTransform +internal abstract class ClosedShapeUpdateOperation: IDocumentStorageOperation, IRevisionedOperation, IIdentifiedOperation, JasperFx.Core.Exceptions.IExceptionTransform where TDoc : notnull where TId : notnull { - public TId Id => _id; - - private readonly TDoc _document; - private readonly TId _id; - private readonly string _tenantId; - private readonly DocumentStorageDescriptor _descriptor; - private readonly Dictionary? _versions; - private readonly Dictionary? _revisions; - private readonly Guid _newVersion; + protected readonly TDoc _document; + protected readonly TId _id; + protected readonly string _tenantId; + protected readonly DocumentStorageDescriptor _descriptor; - public ClosedShapeUpdateOperation( + protected ClosedShapeUpdateOperation( TDoc document, TId id, string tenantId, - DocumentStorageDescriptor descriptor, - Dictionary? versions, - Dictionary? revisions) + DocumentStorageDescriptor descriptor) { _document = document; _id = id; _tenantId = tenantId; _descriptor = descriptor; - _versions = versions; - _revisions = revisions; - if (descriptor.ConcurrencyMode == ConcurrencyMode.Optimistic) - { - _newVersion = CombGuidIdGeneration.NewGuid(); - } } + public TId Id => _id; + public long Revision { get; set; } public bool IgnoreConcurrencyViolation { get; set; } @@ -76,16 +66,24 @@ public Marten.Internal.DirtyTracking.IChangeTracker ToTracker(IMartenSession ses public OperationRole Role() => OperationRole.Update; - public void ConfigureCommand(ICommandBuilder builder, IMartenSession session) + public abstract void ConfigureCommand(ICommandBuilder builder, IMartenSession session); + + public abstract Task PostprocessAsync(DbDataReader reader, IList exceptions, CancellationToken token); + + /// + /// Bind data + client-side binders + id [+ tenant_id] [+ partition PK + /// binders], stopping at the trailing concurrency WHERE slot. Returns + /// the slot index for the concurrency-guard parameters that each + /// concurrency-specific leaf appends. + /// + /// + /// Bug #4223: partitioned tables include the partition column(s) in + /// the PK, so the WHERE clause adds a {col} = ? slot per + /// partition column. Without this we'd update every row matching + /// id = ? across partitions. + /// + protected int BindPreConcurrencyParameters(NpgsqlParameter[] parameters, IMartenSession session) { - // Update SQL ordering: - // data (0), binders (1+), id (n), [tenant_id], [guard params] - // Numeric: revision binder consumes 2 slots in SET; WHERE adds 2 - // more slots for the same Revision value. - var parameters = builder.AppendWithParameters(_descriptor.UpdateSql, '?'); - - // Project session-derived metadata onto the document BEFORE - // serialization so the values flow into the JSON data column. foreach (var binder in _descriptor.WriteBinders) { binder.ApplyToDocument(_document, session); @@ -96,7 +94,7 @@ public void ConfigureCommand(ICommandBuilder builder, IMartenSession session) var slot = 1; foreach (var binder in _descriptor.ClientSideWriteBinders) { - slot = BindBinder(parameters, slot, binder, session); + slot = BindClientSideBinder(parameters, slot, binder, session); } parameters[slot].Value = _descriptor.Identification.ToRawSqlValue(_id); @@ -110,108 +108,22 @@ public void ConfigureCommand(ICommandBuilder builder, IMartenSession session) slot++; } - // Bug #4223: partitioned tables include the partition column in - // the PK; the Update WHERE clause adds a ` = ?` slot per - // partition column so we update exactly the right row, not - // every row matching `id = ?`. foreach (var pk in _descriptor.PartitionPkBinders) { pk.BindParameter(parameters[slot], _document, session); slot++; } - if (_descriptor.ConcurrencyMode == ConcurrencyMode.Optimistic) - { - if (_versions!.TryGetValue(_id, out var expected)) - { - parameters[slot].Value = expected; - } - else - { - parameters[slot].Value = DBNull.Value; - } - parameters[slot].NpgsqlDbType = NpgsqlDbType.Uuid; - } - else if (_descriptor.ConcurrencyMode == ConcurrencyMode.Numeric) - { - // WHERE (? = 0 or {table}.mt_version < ?) — bind raw Revision to both slots. - // #4614: parameter type tracks the column width (integer/bigint). - var revisionDbType = _descriptor.RevisionBinder!.ColumnDbType; - var revisionValue = revisionDbType == NpgsqlDbType.Integer - ? (object)checked((int)Revision) - : Revision; - parameters[slot].Value = revisionValue; - parameters[slot].NpgsqlDbType = revisionDbType; - parameters[slot + 1].Value = revisionValue; - parameters[slot + 1].NpgsqlDbType = revisionDbType; - } - } - - public async Task PostprocessAsync(DbDataReader reader, IList exceptions, CancellationToken token) - { - if (!await reader.ReadAsync(token).ConfigureAwait(false)) - { - if (!IgnoreConcurrencyViolation) - { - exceptions.Add(MissingRowException()); - } - return; - } - - ApplyConcurrencyResult(reader); + return slot; } - private Exception MissingRowException() - => _descriptor.ConcurrencyMode == ConcurrencyMode.Off - ? new NonExistentDocumentException(typeof(TDoc), _id) - : new ConcurrencyException(typeof(TDoc), _id); - - private int BindBinder(NpgsqlParameter[] parameters, int slot, IDocumentMetadataBinder binder, IMartenSession session) - { - if (_descriptor.ConcurrencyMode == ConcurrencyMode.Optimistic && - ReferenceEquals(binder, _descriptor.VersionBinder)) - { - parameters[slot].Value = _newVersion; - parameters[slot].NpgsqlDbType = NpgsqlDbType.Uuid; - _descriptor.VersionBinder.ApplyVersionTo(_document, _newVersion); - return slot + 1; - } - - if (_descriptor.ConcurrencyMode == ConcurrencyMode.Numeric && - ReferenceEquals(binder, _descriptor.RevisionBinder)) - { - // SET mt_version = CASE WHEN ? = 0 THEN current+1 ELSE ? END - // #4614: parameter type tracks the column width (integer/bigint). - var revisionDbType = _descriptor.RevisionBinder.ColumnDbType; - var revisionValue = revisionDbType == NpgsqlDbType.Integer - ? (object)checked((int)Revision) - : Revision; - parameters[slot].Value = revisionValue; - parameters[slot].NpgsqlDbType = revisionDbType; - parameters[slot + 1].Value = revisionValue; - parameters[slot + 1].NpgsqlDbType = revisionDbType; - return slot + 2; - } + /// + /// Bind a single client-side write binder; concurrency-aware + /// subclasses override this to special-case the VersionBinder / + /// RevisionBinder. + /// + protected abstract int BindClientSideBinder(NpgsqlParameter[] parameters, int slot, IDocumentMetadataBinder binder, IMartenSession session); - binder.BindParameter(parameters[slot], _document, session); - return slot + 1; - } - - private void ApplyConcurrencyResult(DbDataReader reader) - { - switch (_descriptor.ConcurrencyMode) - { - case ConcurrencyMode.Optimistic: - _versions![_id] = _newVersion; - break; - case ConcurrencyMode.Numeric: - var newRevision = reader.GetFieldValue(0); - _revisions![_id] = newRevision; - _descriptor.RevisionBinder?.ApplyRevisionTo(_document, newRevision); - break; - } - } public bool TryTransform(System.Exception original, out System.Exception? transformed) => ClosedShapeOperationExceptionTransform.TryTransform(original, _descriptor.TableName, typeof(TDoc), _id!, out transformed); - } diff --git a/src/Marten/Internal/ClosedShape/ClosedShapeUpsertOperation.cs b/src/Marten/Internal/ClosedShape/ClosedShapeUpsertOperation.cs index f8c857bf6e..3418ebc767 100644 --- a/src/Marten/Internal/ClosedShape/ClosedShapeUpsertOperation.cs +++ b/src/Marten/Internal/ClosedShape/ClosedShapeUpsertOperation.cs @@ -4,10 +4,6 @@ using System.Data.Common; using System.Threading; using System.Threading.Tasks; -using JasperFx; -using JasperFx.Core; -using Marten.Exceptions; -using Marten.Internal; using Marten.Internal.Operations; using Npgsql; using NpgsqlTypes; @@ -17,60 +13,43 @@ namespace Marten.Internal.ClosedShape; /// -/// W3 spike (M1+M7+M8): hand-written upsert operation that consumes the -/// descriptor's pre-built SQL + client-side binder array. +/// Abstract base for the per- closed-shape +/// Upsert operation. Sealed subclasses provide the concrete +/// + bodies +/// so the hot path doesn't branch on ConcurrencyMode (#4659). /// /// -/// Concurrency variants: -/// -/// Optimistic: ON CONFLICT DO UPDATE adds where mt_version = ? -/// (caller-supplied expected version). A mismatch produces no -/// RETURNING row → . -/// Numeric: revision binder writes -/// CASE WHEN ? = 0 THEN current+1 ELSE ? END; ON CONFLICT -/// guard is ? = 0 OR current < supplied. A failed guard -/// surfaces as unless -/// is -/// set. -/// +/// The Upsert is supplied by the caller +/// (Upsert or Insert) so the dirty-tracking change tracker can register +/// the right kind of op against the session. Subclasses inherit the same +/// role choice. /// -internal sealed class ClosedShapeUpsertOperation: IDocumentStorageOperation, IRevisionedOperation, IIdentifiedOperation, JasperFx.Core.Exceptions.IExceptionTransform +internal abstract class ClosedShapeUpsertOperation: IDocumentStorageOperation, IRevisionedOperation, IIdentifiedOperation, JasperFx.Core.Exceptions.IExceptionTransform where TDoc : notnull where TId : notnull { - public TId Id => _id; - - private readonly TDoc _document; - private readonly TId _id; - private readonly string _tenantId; - private readonly DocumentStorageDescriptor _descriptor; - private readonly OperationRole _role; - private readonly Dictionary? _versions; - private readonly Dictionary? _revisions; - private readonly Guid _newVersion; + protected readonly TDoc _document; + protected readonly TId _id; + protected readonly string _tenantId; + protected readonly DocumentStorageDescriptor _descriptor; + protected readonly OperationRole _role; - public ClosedShapeUpsertOperation( + protected ClosedShapeUpsertOperation( TDoc document, TId id, string tenantId, DocumentStorageDescriptor descriptor, - OperationRole role, - Dictionary? versions, - Dictionary? revisions) + OperationRole role) { _document = document; _id = id; _tenantId = tenantId; _descriptor = descriptor; _role = role; - _versions = versions; - _revisions = revisions; - if (descriptor.ConcurrencyMode == ConcurrencyMode.Optimistic) - { - _newVersion = CombGuidIdGeneration.NewGuid(); - } } + public TId Id => _id; + public long Revision { get; set; } public bool IgnoreConcurrencyViolation { get; set; } @@ -84,13 +63,17 @@ public Marten.Internal.DirtyTracking.IChangeTracker ToTracker(IMartenSession ses public OperationRole Role() => _role; - public void ConfigureCommand(ICommandBuilder builder, IMartenSession session) - { - // Upsert SQL ordering (non-conjoined, Numeric): - // id, data, rev (INSERT CASE × 2), [other binders], - // rev (SET CASE × 2), rev (WHERE × 2) - var parameters = builder.AppendWithParameters(_descriptor.UpsertSql, '?'); + public abstract void ConfigureCommand(ICommandBuilder builder, IMartenSession session); + + public abstract Task PostprocessAsync(DbDataReader reader, IList exceptions, CancellationToken token); + /// + /// Bind [tenant_id,] id, data + the client-side write binders + /// up to (not including) the trailing ON CONFLICT concurrency-extras + /// slots. Returns the next free parameter slot. + /// + protected int BindPreOnConflictParameters(NpgsqlParameter[] parameters, IMartenSession session) + { var slot = 0; if (_descriptor.IsConjoined) { @@ -103,8 +86,6 @@ public void ConfigureCommand(ICommandBuilder builder, IMartenSession session) parameters[slot].NpgsqlDbType = PostgresqlProvider.Instance.ToParameterType(_descriptor.Identification.RawSqlType); slot++; - // Project session-derived metadata onto the document BEFORE - // serialization so the values flow into the JSON data column. foreach (var binder in _descriptor.WriteBinders) { binder.ApplyToDocument(_document, session); @@ -115,127 +96,39 @@ public void ConfigureCommand(ICommandBuilder builder, IMartenSession session) foreach (var binder in _descriptor.ClientSideWriteBinders) { - slot = BindBinder(parameters, slot, binder, session); + slot = BindClientSideBinder(parameters, slot, binder, session); } - // ON CONFLICT side concurrency-related extras. - if (_descriptor.ConcurrencyMode == ConcurrencyMode.Optimistic) - { - // ON CONFLICT DO UPDATE ... WHERE table.mt_version = ? - if (_versions!.TryGetValue(_id, out var expected)) - { - parameters[slot].Value = expected; - } - else - { - parameters[slot].Value = DBNull.Value; - } - parameters[slot].NpgsqlDbType = NpgsqlDbType.Uuid; - } - else if (_descriptor.ConcurrencyMode == ConcurrencyMode.Numeric) - { - // DO UPDATE SET mt_version = CASE WHEN ? = 0 THEN current+1 ELSE ? END - // WHERE ? = 0 OR table.mt_version < ? - for (var i = 0; i < 4; i++) - { - parameters[slot + i].Value = Revision; - parameters[slot + i].NpgsqlDbType = NpgsqlDbType.Bigint; - } - } + return slot; } - public async Task PostprocessAsync(DbDataReader reader, IList exceptions, CancellationToken token) + /// + /// Bind the optional id [, tenant_id] subquery slots that + /// UseVersionFromMatchingStream emits inside the revision CASE + /// expression. Common to Numeric Upsert / Overwrite. + /// + protected int BindUseVersionFromMatchingStreamSubquery(NpgsqlParameter[] parameters, int slot) { - if (_descriptor.ConcurrencyMode == ConcurrencyMode.Off) - { - // Mode Off Upsert is fire-and-forget today — RETURNING id is - // there for symmetry with Insert/Update but the result isn't - // inspected. - return; - } - - if (!await reader.ReadAsync(token).ConfigureAwait(false)) - { - if (!IgnoreConcurrencyViolation) - { - exceptions.Add(new ConcurrencyException(typeof(TDoc), _id)); - } - return; - } - - ApplyConcurrencyResult(reader); - } - - private int BindBinder(NpgsqlParameter[] parameters, int slot, IDocumentMetadataBinder binder, IMartenSession session) - { - if (_descriptor.ConcurrencyMode == ConcurrencyMode.Optimistic && - ReferenceEquals(binder, _descriptor.VersionBinder)) - { - parameters[slot].Value = _newVersion; - parameters[slot].NpgsqlDbType = NpgsqlDbType.Uuid; - _descriptor.VersionBinder.ApplyVersionTo(_document, _newVersion); - return slot + 1; - } + parameters[slot].Value = _descriptor.Identification.ToRawSqlValue(_id); + parameters[slot].NpgsqlDbType = PostgresqlProvider.Instance.ToParameterType(_descriptor.Identification.RawSqlType); + slot++; - if (_descriptor.ConcurrencyMode == ConcurrencyMode.Numeric && - ReferenceEquals(binder, _descriptor.RevisionBinder)) + if (_descriptor.IsConjoined) { - // INSERT VALUES side; two layouts depending on UseVersionFromMatchingStream: - // Default: CASE WHEN ? = 0 THEN 1 ELSE ? END (2 slots) - // UseVersionFromMatchingStream (non-conjoined): CASE WHEN ? = 0 - // THEN COALESCE((select version from .mt_streams where id = ?), 1) - // ELSE ? END (3 slots) - // UseVersionFromMatchingStream + conjoined: same with extra ? for tenant_id (4 slots) - // The ON CONFLICT SET / WHERE branches always reference {table}.id - // directly so they keep their 4 revision slots downstream. - // #4614: the parameter type follows the column width (integer for IRevisioned, - // bigint for ILongVersioned) so the CASE expression's branch types align. - var revisionDbType = _descriptor.RevisionBinder.ColumnDbType; - var revisionValue = revisionDbType == NpgsqlDbType.Integer - ? (object)checked((int)Revision) - : Revision; - parameters[slot].Value = revisionValue; - parameters[slot].NpgsqlDbType = revisionDbType; + parameters[slot].Value = _tenantId; + parameters[slot].NpgsqlDbType = NpgsqlDbType.Varchar; slot++; - - if (_descriptor.UseVersionFromMatchingStream) - { - parameters[slot].Value = _descriptor.Identification.ToRawSqlValue(_id); - parameters[slot].NpgsqlDbType = PostgresqlProvider.Instance.ToParameterType(_descriptor.Identification.RawSqlType); - slot++; - - if (_descriptor.IsConjoined) - { - parameters[slot].Value = _tenantId; - parameters[slot].NpgsqlDbType = NpgsqlDbType.Varchar; - slot++; - } - } - - parameters[slot].Value = revisionValue; - parameters[slot].NpgsqlDbType = revisionDbType; - return slot + 1; } - binder.BindParameter(parameters[slot], _document, session); - return slot + 1; + return slot; } - private void ApplyConcurrencyResult(DbDataReader reader) - { - switch (_descriptor.ConcurrencyMode) - { - case ConcurrencyMode.Optimistic: - _versions![_id] = _newVersion; - break; - case ConcurrencyMode.Numeric: - var newRevision = reader.GetFieldValue(0); - _revisions![_id] = newRevision; - _descriptor.RevisionBinder?.ApplyRevisionTo(_document, newRevision); - break; - } - } + /// + /// Concurrency-aware subclasses override to special-case the + /// VersionBinder / RevisionBinder. + /// + protected abstract int BindClientSideBinder(NpgsqlParameter[] parameters, int slot, IDocumentMetadataBinder binder, IMartenSession session); + public bool TryTransform(System.Exception original, out System.Exception? transformed) => ClosedShapeOperationExceptionTransform.TryTransform(original, _descriptor.TableName, typeof(TDoc), _id!, out transformed); - } diff --git a/src/Marten/Internal/ClosedShape/DirtyCheckedClosedShapeStorage.cs b/src/Marten/Internal/ClosedShape/DirtyCheckedClosedShapeStorage.cs index 483403056e..02935e5773 100644 --- a/src/Marten/Internal/ClosedShape/DirtyCheckedClosedShapeStorage.cs +++ b/src/Marten/Internal/ClosedShape/DirtyCheckedClosedShapeStorage.cs @@ -9,18 +9,18 @@ namespace Marten.Internal.ClosedShape; /// -/// W3 spike (M2+M4+M7): hand-written, closed-shape -/// for any -/// . Selected when a session opens with -/// DocumentTracking.DirtyTracking. +/// Closed-shape base. +/// Sealed concurrency-mode leaves provide the write factories + +/// BuildSelector. See +/// for the rationale (#4659). /// -public sealed class DirtyCheckedClosedShapeStorage: DirtyCheckedDocumentStorage +public abstract class DirtyCheckedClosedShapeStorage: DirtyCheckedDocumentStorage where TDoc : notnull where TId : notnull { - private readonly DocumentStorageDescriptor _descriptor; + protected readonly DocumentStorageDescriptor _descriptor; - public DirtyCheckedClosedShapeStorage(DocumentMapping mapping, DocumentStorageDescriptor descriptor) + protected DirtyCheckedClosedShapeStorage(DocumentMapping mapping, DocumentStorageDescriptor descriptor) : base(mapping) { _descriptor = descriptor; @@ -37,32 +37,4 @@ public override object RawIdentityValue(TId id) public override Npgsql.NpgsqlParameter BuildManyIdParameter(TId[] ids) => ClosedShapeIdHelpers.BuildManyIdParameter(ids, _descriptor.Identification); - - public override IStorageOperation Insert(TDoc document, IMartenSession session, string tenant) - => new ClosedShapeInsertOperation(document, Identity(document), tenant, _descriptor, VersionsFor(session), RevisionsFor(session)); - - public override IStorageOperation Update(TDoc document, IMartenSession session, string tenant) - => new ClosedShapeUpdateOperation(document, Identity(document), tenant, _descriptor, VersionsFor(session), RevisionsFor(session)); - - public override IStorageOperation Upsert(TDoc document, IMartenSession session, string tenant) - => new ClosedShapeUpsertOperation(document, Identity(document), tenant, _descriptor, OperationRole.Upsert, VersionsFor(session), RevisionsFor(session)); - - public override IStorageOperation Overwrite(TDoc document, IMartenSession session, string tenant) - => new ClosedShapeOverwriteOperation(document, Identity(document), tenant, _descriptor, VersionsFor(session), RevisionsFor(session)); - - public override IStorageOperation OverwriteProjected(TDoc document, string tenant) - => new ClosedShapeOverwriteOperation(document, Identity(document), tenant, _descriptor, null, null); - - public override ISelector BuildSelector(IMartenSession session) - => new ClosedShapeDirtyTrackingSelector(session, _descriptor); - - private System.Collections.Generic.Dictionary? VersionsFor(IMartenSession session) - => _descriptor.ConcurrencyMode == ConcurrencyMode.Optimistic - ? session.Versions.ForType() - : null; - - private System.Collections.Generic.Dictionary? RevisionsFor(IMartenSession session) - => _descriptor.ConcurrencyMode == ConcurrencyMode.Numeric - ? session.Versions.RevisionsFor() - : null; } diff --git a/src/Marten/Internal/ClosedShape/IdentityMapClosedShapeStorage.cs b/src/Marten/Internal/ClosedShape/IdentityMapClosedShapeStorage.cs index 268682b783..15273fc01a 100644 --- a/src/Marten/Internal/ClosedShape/IdentityMapClosedShapeStorage.cs +++ b/src/Marten/Internal/ClosedShape/IdentityMapClosedShapeStorage.cs @@ -9,18 +9,18 @@ namespace Marten.Internal.ClosedShape; /// -/// W3 spike (M2+M4+M7): hand-written, closed-shape -/// for any -/// . Selected when a session opens with -/// DocumentTracking.IdentityOnly. +/// Closed-shape base. +/// Sealed concurrency-mode leaves provide the write factories + +/// BuildSelector. See +/// for the rationale (#4659). /// -public sealed class IdentityMapClosedShapeStorage: IdentityMapDocumentStorage +public abstract class IdentityMapClosedShapeStorage: IdentityMapDocumentStorage where TDoc : notnull where TId : notnull { - private readonly DocumentStorageDescriptor _descriptor; + protected readonly DocumentStorageDescriptor _descriptor; - public IdentityMapClosedShapeStorage(DocumentMapping mapping, DocumentStorageDescriptor descriptor) + protected IdentityMapClosedShapeStorage(DocumentMapping mapping, DocumentStorageDescriptor descriptor) : base(mapping) { _descriptor = descriptor; @@ -37,32 +37,4 @@ public override object RawIdentityValue(TId id) public override Npgsql.NpgsqlParameter BuildManyIdParameter(TId[] ids) => ClosedShapeIdHelpers.BuildManyIdParameter(ids, _descriptor.Identification); - - public override IStorageOperation Insert(TDoc document, IMartenSession session, string tenant) - => new ClosedShapeInsertOperation(document, Identity(document), tenant, _descriptor, VersionsFor(session), RevisionsFor(session)); - - public override IStorageOperation Update(TDoc document, IMartenSession session, string tenant) - => new ClosedShapeUpdateOperation(document, Identity(document), tenant, _descriptor, VersionsFor(session), RevisionsFor(session)); - - public override IStorageOperation Upsert(TDoc document, IMartenSession session, string tenant) - => new ClosedShapeUpsertOperation(document, Identity(document), tenant, _descriptor, OperationRole.Upsert, VersionsFor(session), RevisionsFor(session)); - - public override IStorageOperation Overwrite(TDoc document, IMartenSession session, string tenant) - => new ClosedShapeOverwriteOperation(document, Identity(document), tenant, _descriptor, VersionsFor(session), RevisionsFor(session)); - - public override IStorageOperation OverwriteProjected(TDoc document, string tenant) - => new ClosedShapeOverwriteOperation(document, Identity(document), tenant, _descriptor, null, null); - - public override ISelector BuildSelector(IMartenSession session) - => new ClosedShapeIdentityMapSelector(session, _descriptor); - - private System.Collections.Generic.Dictionary? VersionsFor(IMartenSession session) - => _descriptor.ConcurrencyMode == ConcurrencyMode.Optimistic - ? session.Versions.ForType() - : null; - - private System.Collections.Generic.Dictionary? RevisionsFor(IMartenSession session) - => _descriptor.ConcurrencyMode == ConcurrencyMode.Numeric - ? session.Versions.RevisionsFor() - : null; } diff --git a/src/Marten/Internal/ClosedShape/LightweightClosedShapeStorage.cs b/src/Marten/Internal/ClosedShape/LightweightClosedShapeStorage.cs index bc0da27efa..61ee51dff0 100644 --- a/src/Marten/Internal/ClosedShape/LightweightClosedShapeStorage.cs +++ b/src/Marten/Internal/ClosedShape/LightweightClosedShapeStorage.cs @@ -9,36 +9,31 @@ namespace Marten.Internal.ClosedShape; /// -/// W3 spike (M2+M4+M7): hand-written, closed-shape -/// for any -/// . Composes -/// (SQL + metadata -/// binders) and (identity -/// strategy). The closed-shape JIT specialization happens at -/// construction time per (TDoc, TId) closure — no runtime -/// branching after that. +/// Closed-shape base. +/// Holds the shared infrastructure (Identity / AssignIdentity / +/// RawIdentityValue / BuildManyIdParameter) common to every concurrency +/// flavor; concrete subclasses provide the Insert / Update / Upsert / +/// Overwrite factories + BuildSelector so the storage class is +/// monomorphic-by-construction per (TDoc, TId, ConcurrencyMode) +/// closure (#4659). /// /// /// -/// One cell of the planned W3 matrix: Lightweight + any id type + any -/// concurrency mode + no revisions + no tenancy + no hierarchical. -/// -/// -/// Inheriting picks up -/// Store / Eject / LoadAsync / LoadManyAsync. What we hand-write here: -/// Identity / AssignIdentity (via the descriptor's -/// ), Insert / Update / Upsert / -/// Overwrite (return the corresponding closed-shape operation), and -/// BuildSelector (returns ). +/// Public sealed → public abstract. The class still exists as a public +/// type but cannot be instantiated directly; consumers go through +/// which builds the right +/// concurrency-mode leaf. The W3 spike's Use…ClosedShape +/// extension helpers still work — the registration internals just +/// dispatch on . /// /// -public sealed class LightweightClosedShapeStorage: LightweightDocumentStorage +public abstract class LightweightClosedShapeStorage: LightweightDocumentStorage where TDoc : notnull where TId : notnull { - private readonly DocumentStorageDescriptor _descriptor; + protected readonly DocumentStorageDescriptor _descriptor; - public LightweightClosedShapeStorage(DocumentMapping mapping, DocumentStorageDescriptor descriptor) + protected LightweightClosedShapeStorage(DocumentMapping mapping, DocumentStorageDescriptor descriptor) : base(mapping) { _descriptor = descriptor; @@ -57,32 +52,4 @@ public override object RawIdentityValue(TId id) public override Npgsql.NpgsqlParameter BuildManyIdParameter(TId[] ids) => ClosedShapeIdHelpers.BuildManyIdParameter(ids, _descriptor.Identification); - - public override IStorageOperation Insert(TDoc document, IMartenSession session, string tenant) - => new ClosedShapeInsertOperation(document, Identity(document), tenant, _descriptor, VersionsFor(session), RevisionsFor(session)); - - public override IStorageOperation Update(TDoc document, IMartenSession session, string tenant) - => new ClosedShapeUpdateOperation(document, Identity(document), tenant, _descriptor, VersionsFor(session), RevisionsFor(session)); - - public override IStorageOperation Upsert(TDoc document, IMartenSession session, string tenant) - => new ClosedShapeUpsertOperation(document, Identity(document), tenant, _descriptor, OperationRole.Upsert, VersionsFor(session), RevisionsFor(session)); - - public override IStorageOperation Overwrite(TDoc document, IMartenSession session, string tenant) - => new ClosedShapeOverwriteOperation(document, Identity(document), tenant, _descriptor, VersionsFor(session), RevisionsFor(session)); - - public override IStorageOperation OverwriteProjected(TDoc document, string tenant) - => new ClosedShapeOverwriteOperation(document, Identity(document), tenant, _descriptor, null, null); - - public override ISelector BuildSelector(IMartenSession session) - => new ClosedShapeLightweightSelector(session, _descriptor); - - private System.Collections.Generic.Dictionary? VersionsFor(IMartenSession session) - => _descriptor.ConcurrencyMode == ConcurrencyMode.Optimistic - ? session.Versions.ForType() - : null; - - private System.Collections.Generic.Dictionary? RevisionsFor(IMartenSession session) - => _descriptor.ConcurrencyMode == ConcurrencyMode.Numeric - ? session.Versions.RevisionsFor() - : null; } diff --git a/src/Marten/Internal/ClosedShape/NumericClosedShapeDirtyTrackingSelector.cs b/src/Marten/Internal/ClosedShape/NumericClosedShapeDirtyTrackingSelector.cs new file mode 100644 index 0000000000..047497d0f9 --- /dev/null +++ b/src/Marten/Internal/ClosedShape/NumericClosedShapeDirtyTrackingSelector.cs @@ -0,0 +1,33 @@ +#nullable enable +using System.Collections.Generic; +using System.Data.Common; + +namespace Marten.Internal.ClosedShape; + +/// +/// ConcurrencyMode.Numeric DirtyTracking selector. CaptureVersion +/// writes the row's mt_version (long) into the session's per-type +/// revision dict. #4659 leaf. +/// +internal sealed class NumericClosedShapeDirtyTrackingSelector: ClosedShapeDirtyTrackingSelector + where T : notnull + where TId : notnull +{ + private readonly Dictionary _revisions; + + public NumericClosedShapeDirtyTrackingSelector(IMartenSession session, DocumentStorageDescriptor descriptor) + : base(session, descriptor) + { + _revisions = session.Versions.RevisionsFor(); + } + + protected override void CaptureVersion(DbDataReader reader, TId id) + { + var versionIndex = _descriptor.VersionReadIndex; + if (versionIndex < 0) return; + var versionOrdinal = FirstMetadataColumn + versionIndex; + if (reader.IsDBNull(versionOrdinal)) return; + + _revisions[id] = reader.GetFieldValue(versionOrdinal); + } +} diff --git a/src/Marten/Internal/ClosedShape/NumericClosedShapeIdentityMapSelector.cs b/src/Marten/Internal/ClosedShape/NumericClosedShapeIdentityMapSelector.cs new file mode 100644 index 0000000000..c5a5f3423d --- /dev/null +++ b/src/Marten/Internal/ClosedShape/NumericClosedShapeIdentityMapSelector.cs @@ -0,0 +1,33 @@ +#nullable enable +using System.Collections.Generic; +using System.Data.Common; + +namespace Marten.Internal.ClosedShape; + +/// +/// ConcurrencyMode.Numeric IdentityMap selector. CaptureVersion +/// writes the row's mt_version (long) into the session's per-type +/// revision dict. #4659 leaf. +/// +internal sealed class NumericClosedShapeIdentityMapSelector: ClosedShapeIdentityMapSelector + where T : notnull + where TId : notnull +{ + private readonly Dictionary _revisions; + + public NumericClosedShapeIdentityMapSelector(IMartenSession session, DocumentStorageDescriptor descriptor) + : base(session, descriptor) + { + _revisions = session.Versions.RevisionsFor(); + } + + protected override void CaptureVersion(DbDataReader reader, TId id) + { + var versionIndex = _descriptor.VersionReadIndex; + if (versionIndex < 0) return; + var versionOrdinal = FirstMetadataColumn + versionIndex; + if (reader.IsDBNull(versionOrdinal)) return; + + _revisions[id] = reader.GetFieldValue(versionOrdinal); + } +} diff --git a/src/Marten/Internal/ClosedShape/NumericClosedShapeInsertOperation.cs b/src/Marten/Internal/ClosedShape/NumericClosedShapeInsertOperation.cs new file mode 100644 index 0000000000..15446bd497 --- /dev/null +++ b/src/Marten/Internal/ClosedShape/NumericClosedShapeInsertOperation.cs @@ -0,0 +1,108 @@ +#nullable enable +using System; +using System.Collections.Generic; +using System.Data.Common; +using System.Threading; +using System.Threading.Tasks; +using JasperFx; +using Marten.Exceptions; +using Npgsql; +using NpgsqlTypes; +using Weasel.Core; +using Weasel.Postgresql; + +namespace Marten.Internal.ClosedShape; + +/// +/// ConcurrencyMode.Numeric closed-shape Insert. Binds the revision +/// CASE WHEN ? = 0 THEN … ELSE ? END block (2–4 parameter slots +/// depending on +/// + ), +/// reads the resolved revision out of the RETURNING row, and writes it +/// back through +/// + the session's per-type revision tracker. #4659 leaf. +/// +internal sealed class NumericClosedShapeInsertOperation: ClosedShapeInsertOperation + where TDoc : notnull + where TId : notnull +{ + private readonly Dictionary _revisions; + + public NumericClosedShapeInsertOperation( + TDoc document, + TId id, + string tenantId, + DocumentStorageDescriptor descriptor, + Dictionary revisions) + : base(document, id, tenantId, descriptor) + { + _revisions = revisions; + } + + public override void ConfigureCommand(ICommandBuilder builder, IMartenSession session) + { + var parameters = builder.AppendWithParameters(_descriptor.InsertSql, '?'); + var slot = BindLeadingParameters(parameters, session); + + foreach (var binder in _descriptor.ClientSideWriteBinders) + { + if (ReferenceEquals(binder, _descriptor.RevisionBinder)) + { + slot = BindRevisionBlock(parameters, slot); + } + else + { + binder.BindParameter(parameters[slot], _document, session); + slot++; + } + } + } + + public override async Task PostprocessAsync(DbDataReader reader, IList exceptions, CancellationToken token) + { + if (!await reader.ReadAsync(token).ConfigureAwait(false)) + { + exceptions.Add(new DocumentAlreadyExistsException(null, typeof(TDoc), _id)); + return; + } + + var newRevision = await reader.GetFieldValueAsync(0, token).ConfigureAwait(false); + _revisions[_id] = newRevision; + _descriptor.RevisionBinder!.ApplyRevisionTo(_document, newRevision); + } + + /// + /// Numeric Insert binds the revision CASE expression in two/three/four + /// slots: + /// + /// Default: CASE WHEN ? = 0 THEN 1 ELSE ? END (2 slots). + /// UseVersionFromMatchingStream (non-conjoined): + /// CASE WHEN ? = 0 THEN COALESCE((select version from mt_streams where id = ?), 1) ELSE ? END + /// (3 slots — the ? = 0 check, the id subquery, the explicit revision). + /// UseVersionFromMatchingStream + IsConjoined: extra ? for + /// tenant_id inside the subquery (4 slots). + /// + /// #4614: the parameter type tracks the column width (integer vs bigint) + /// so the CASE branch types align. + /// + private int BindRevisionBlock(NpgsqlParameter[] parameters, int slot) + { + var revisionDbType = _descriptor.RevisionBinder!.ColumnDbType; + var revisionValue = revisionDbType == NpgsqlDbType.Integer + ? (object)checked((int)Revision) + : Revision; + + parameters[slot].Value = revisionValue; + parameters[slot].NpgsqlDbType = revisionDbType; + slot++; + + if (_descriptor.UseVersionFromMatchingStream) + { + slot = BindUseVersionFromMatchingStreamSubquery(parameters, slot); + } + + parameters[slot].Value = revisionValue; + parameters[slot].NpgsqlDbType = revisionDbType; + return slot + 1; + } +} diff --git a/src/Marten/Internal/ClosedShape/NumericClosedShapeLightweightSelector.cs b/src/Marten/Internal/ClosedShape/NumericClosedShapeLightweightSelector.cs new file mode 100644 index 0000000000..ed97b2edbd --- /dev/null +++ b/src/Marten/Internal/ClosedShape/NumericClosedShapeLightweightSelector.cs @@ -0,0 +1,33 @@ +#nullable enable +using System.Collections.Generic; +using System.Data.Common; + +namespace Marten.Internal.ClosedShape; + +/// +/// ConcurrencyMode.Numeric Lightweight selector. Captures the +/// row's mt_version (long) into the session's per-type revision dict so +/// subsequent updates can supply it as the expected revision. #4659 leaf. +/// +internal sealed class NumericClosedShapeLightweightSelector: ClosedShapeLightweightSelector + where T : notnull + where TId : notnull +{ + private readonly Dictionary _revisions; + + public NumericClosedShapeLightweightSelector(IMartenSession session, DocumentStorageDescriptor descriptor) + : base(session, descriptor) + { + _revisions = session.Versions.RevisionsFor(); + } + + protected override void CaptureVersion(DbDataReader reader, TId id) + { + var versionIndex = _descriptor.VersionReadIndex; + if (versionIndex < 0) return; + var versionOrdinal = FirstMetadataColumn + versionIndex; + if (reader.IsDBNull(versionOrdinal)) return; + + _revisions[id] = reader.GetFieldValue(versionOrdinal); + } +} diff --git a/src/Marten/Internal/ClosedShape/NumericClosedShapeOverwriteOperation.cs b/src/Marten/Internal/ClosedShape/NumericClosedShapeOverwriteOperation.cs new file mode 100644 index 0000000000..351b69bf69 --- /dev/null +++ b/src/Marten/Internal/ClosedShape/NumericClosedShapeOverwriteOperation.cs @@ -0,0 +1,94 @@ +#nullable enable +using System; +using System.Collections.Generic; +using System.Data.Common; +using System.Threading; +using System.Threading.Tasks; +using Npgsql; +using NpgsqlTypes; +using Weasel.Core; +using Weasel.Postgresql; + +namespace Marten.Internal.ClosedShape; + +/// +/// ConcurrencyMode.Numeric closed-shape Overwrite. Same INSERT +/// VALUES revision CASE block as the Numeric Upsert path, but the SET +/// CASE is 2 slots (no WHERE guard). #4658 — the +/// path passes +/// a null revisions tracker so the projection doesn't poison the +/// session's revision dictionary. #4659 leaf. +/// +internal sealed class NumericClosedShapeOverwriteOperation: ClosedShapeOverwriteOperation + where TDoc : notnull + where TId : notnull +{ + private readonly Dictionary? _revisions; + + public NumericClosedShapeOverwriteOperation( + TDoc document, + TId id, + string tenantId, + DocumentStorageDescriptor descriptor, + Dictionary? revisions) + : base(document, id, tenantId, descriptor) + { + _revisions = revisions; + } + + public override void ConfigureCommand(ICommandBuilder builder, IMartenSession session) + { + var parameters = builder.AppendWithParameters(_descriptor.OverwriteSql, '?'); + var slot = BindPreOnConflictParameters(parameters, session); + + // DO UPDATE SET mt_version = CASE WHEN ? = 0 THEN current+1 ELSE ? END + // No WHERE guard — Overwrite always wins. + parameters[slot].Value = Revision; + parameters[slot].NpgsqlDbType = NpgsqlDbType.Bigint; + parameters[slot + 1].Value = Revision; + parameters[slot + 1].NpgsqlDbType = NpgsqlDbType.Bigint; + } + + public override async Task PostprocessAsync(DbDataReader reader, IList exceptions, CancellationToken token) + { + if (await reader.ReadAsync(token).ConfigureAwait(false)) + { + var newRevision = await reader.GetFieldValueAsync(0, token).ConfigureAwait(false); + // #4658 — null tracker (OverwriteProjected) just skips the + // tracker write. RevisionBinder write still happens so the + // document's revision field is fresh. + if (_revisions is not null) + { + _revisions[_id] = newRevision; + } + _descriptor.RevisionBinder!.ApplyRevisionTo(_document, newRevision); + } + } + + protected override int BindClientSideBinder(NpgsqlParameter[] parameters, int slot, IDocumentMetadataBinder binder, IMartenSession session) + { + if (ReferenceEquals(binder, _descriptor.RevisionBinder)) + { + // INSERT VALUES CASE block — same shape as NumericClosedShapeUpsertOperation. + var revisionDbType = _descriptor.RevisionBinder!.ColumnDbType; + var revisionValue = revisionDbType == NpgsqlDbType.Integer + ? (object)checked((int)Revision) + : Revision; + parameters[slot].Value = revisionValue; + parameters[slot].NpgsqlDbType = revisionDbType; + slot++; + + if (_descriptor.UseVersionFromMatchingStream) + { + slot = BindUseVersionFromMatchingStreamSubquery(parameters, slot); + } + + parameters[slot].Value = revisionValue; + parameters[slot].NpgsqlDbType = revisionDbType; + return slot + 1; + } + + binder.BindParameter(parameters[slot], _document, session); + return slot + 1; + } +} diff --git a/src/Marten/Internal/ClosedShape/NumericClosedShapeUpdateOperation.cs b/src/Marten/Internal/ClosedShape/NumericClosedShapeUpdateOperation.cs new file mode 100644 index 0000000000..b109d0bb4c --- /dev/null +++ b/src/Marten/Internal/ClosedShape/NumericClosedShapeUpdateOperation.cs @@ -0,0 +1,93 @@ +#nullable enable +using System; +using System.Collections.Generic; +using System.Data.Common; +using System.Threading; +using System.Threading.Tasks; +using JasperFx; +using Marten.Exceptions; +using Npgsql; +using NpgsqlTypes; +using Weasel.Core; +using Weasel.Postgresql; + +namespace Marten.Internal.ClosedShape; + +/// +/// ConcurrencyMode.Numeric closed-shape Update. Two revision +/// slots in the SET CASE expression (CASE WHEN ? = 0 THEN +/// current+1 ELSE ? END) plus two in the trailing WHERE clause +/// (? = 0 OR table.mt_version < ?). Missing RETURNING row → +/// revision-guard failure → . #4659 leaf. +/// +internal sealed class NumericClosedShapeUpdateOperation: ClosedShapeUpdateOperation + where TDoc : notnull + where TId : notnull +{ + private readonly Dictionary _revisions; + + public NumericClosedShapeUpdateOperation( + TDoc document, + TId id, + string tenantId, + DocumentStorageDescriptor descriptor, + Dictionary revisions) + : base(document, id, tenantId, descriptor) + { + _revisions = revisions; + } + + public override void ConfigureCommand(ICommandBuilder builder, IMartenSession session) + { + var parameters = builder.AppendWithParameters(_descriptor.UpdateSql, '?'); + var slot = BindPreConcurrencyParameters(parameters, session); + + // Trailing WHERE (? = 0 or {table}.mt_version < ?) — bind the raw + // Revision to both slots. #4614: parameter type tracks column width. + var revisionDbType = _descriptor.RevisionBinder!.ColumnDbType; + var revisionValue = revisionDbType == NpgsqlDbType.Integer + ? (object)checked((int)Revision) + : Revision; + parameters[slot].Value = revisionValue; + parameters[slot].NpgsqlDbType = revisionDbType; + parameters[slot + 1].Value = revisionValue; + parameters[slot + 1].NpgsqlDbType = revisionDbType; + } + + public override async Task PostprocessAsync(DbDataReader reader, IList exceptions, CancellationToken token) + { + if (!await reader.ReadAsync(token).ConfigureAwait(false)) + { + if (!IgnoreConcurrencyViolation) + { + exceptions.Add(new ConcurrencyException(typeof(TDoc), _id)); + } + return; + } + + var newRevision = await reader.GetFieldValueAsync(0, token).ConfigureAwait(false); + _revisions[_id] = newRevision; + _descriptor.RevisionBinder!.ApplyRevisionTo(_document, newRevision); + } + + protected override int BindClientSideBinder(NpgsqlParameter[] parameters, int slot, IDocumentMetadataBinder binder, IMartenSession session) + { + if (ReferenceEquals(binder, _descriptor.RevisionBinder)) + { + // SET mt_version = CASE WHEN ? = 0 THEN current+1 ELSE ? END + // (2 slots — Update side never uses UseVersionFromMatchingStream) + var revisionDbType = _descriptor.RevisionBinder!.ColumnDbType; + var revisionValue = revisionDbType == NpgsqlDbType.Integer + ? (object)checked((int)Revision) + : Revision; + parameters[slot].Value = revisionValue; + parameters[slot].NpgsqlDbType = revisionDbType; + parameters[slot + 1].Value = revisionValue; + parameters[slot + 1].NpgsqlDbType = revisionDbType; + return slot + 2; + } + + binder.BindParameter(parameters[slot], _document, session); + return slot + 1; + } +} diff --git a/src/Marten/Internal/ClosedShape/NumericClosedShapeUpsertOperation.cs b/src/Marten/Internal/ClosedShape/NumericClosedShapeUpsertOperation.cs new file mode 100644 index 0000000000..9e9ef81666 --- /dev/null +++ b/src/Marten/Internal/ClosedShape/NumericClosedShapeUpsertOperation.cs @@ -0,0 +1,102 @@ +#nullable enable +using System; +using System.Collections.Generic; +using System.Data.Common; +using System.Threading; +using System.Threading.Tasks; +using JasperFx; +using Marten.Exceptions; +using Marten.Internal.Operations; +using Npgsql; +using NpgsqlTypes; +using Weasel.Core; +using Weasel.Postgresql; + +namespace Marten.Internal.ClosedShape; + +/// +/// ConcurrencyMode.Numeric closed-shape Upsert. Two/three/four +/// revision slots in the INSERT VALUES CASE block (depending on +/// +/// + ) plus +/// four more in the ON CONFLICT DO UPDATE SET/WHERE block. Missing +/// RETURNING row → revision-guard failure → +/// . #4659 leaf. +/// +internal sealed class NumericClosedShapeUpsertOperation: ClosedShapeUpsertOperation + where TDoc : notnull + where TId : notnull +{ + private readonly Dictionary _revisions; + + public NumericClosedShapeUpsertOperation( + TDoc document, + TId id, + string tenantId, + DocumentStorageDescriptor descriptor, + OperationRole role, + Dictionary revisions) + : base(document, id, tenantId, descriptor, role) + { + _revisions = revisions; + } + + public override void ConfigureCommand(ICommandBuilder builder, IMartenSession session) + { + var parameters = builder.AppendWithParameters(_descriptor.UpsertSql, '?'); + var slot = BindPreOnConflictParameters(parameters, session); + + // ON CONFLICT trailing: + // DO UPDATE SET mt_version = CASE WHEN ? = 0 THEN current+1 ELSE ? END + // WHERE ? = 0 OR table.mt_version < ? + for (var i = 0; i < 4; i++) + { + parameters[slot + i].Value = Revision; + parameters[slot + i].NpgsqlDbType = NpgsqlDbType.Bigint; + } + } + + public override async Task PostprocessAsync(DbDataReader reader, IList exceptions, CancellationToken token) + { + if (!await reader.ReadAsync(token).ConfigureAwait(false)) + { + if (!IgnoreConcurrencyViolation) + { + exceptions.Add(new ConcurrencyException(typeof(TDoc), _id)); + } + return; + } + + var newRevision = await reader.GetFieldValueAsync(0, token).ConfigureAwait(false); + _revisions[_id] = newRevision; + _descriptor.RevisionBinder!.ApplyRevisionTo(_document, newRevision); + } + + protected override int BindClientSideBinder(NpgsqlParameter[] parameters, int slot, IDocumentMetadataBinder binder, IMartenSession session) + { + if (ReferenceEquals(binder, _descriptor.RevisionBinder)) + { + // INSERT VALUES CASE block — see NumericClosedShapeInsertOperation + // for the slot-count rationale. + var revisionDbType = _descriptor.RevisionBinder!.ColumnDbType; + var revisionValue = revisionDbType == NpgsqlDbType.Integer + ? (object)checked((int)Revision) + : Revision; + parameters[slot].Value = revisionValue; + parameters[slot].NpgsqlDbType = revisionDbType; + slot++; + + if (_descriptor.UseVersionFromMatchingStream) + { + slot = BindUseVersionFromMatchingStreamSubquery(parameters, slot); + } + + parameters[slot].Value = revisionValue; + parameters[slot].NpgsqlDbType = revisionDbType; + return slot + 1; + } + + binder.BindParameter(parameters[slot], _document, session); + return slot + 1; + } +} diff --git a/src/Marten/Internal/ClosedShape/NumericDirtyCheckedClosedShapeStorage.cs b/src/Marten/Internal/ClosedShape/NumericDirtyCheckedClosedShapeStorage.cs new file mode 100644 index 0000000000..1ba215266a --- /dev/null +++ b/src/Marten/Internal/ClosedShape/NumericDirtyCheckedClosedShapeStorage.cs @@ -0,0 +1,38 @@ +#nullable enable +using Marten.Internal; +using Marten.Internal.Operations; +using Marten.Linq.Selectors; +using Marten.Schema; + +namespace Marten.Internal.ClosedShape; + +/// +/// ConcurrencyMode.Numeric closed-shape DirtyChecked storage. #4659 leaf. +/// +internal sealed class NumericDirtyCheckedClosedShapeStorage: DirtyCheckedClosedShapeStorage + where TDoc : notnull + where TId : notnull +{ + public NumericDirtyCheckedClosedShapeStorage(DocumentMapping mapping, DocumentStorageDescriptor descriptor) + : base(mapping, descriptor) + { + } + + public override IStorageOperation Insert(TDoc document, IMartenSession session, string tenant) + => new NumericClosedShapeInsertOperation(document, Identity(document), tenant, _descriptor, session.Versions.RevisionsFor()); + + public override IStorageOperation Update(TDoc document, IMartenSession session, string tenant) + => new NumericClosedShapeUpdateOperation(document, Identity(document), tenant, _descriptor, session.Versions.RevisionsFor()); + + public override IStorageOperation Upsert(TDoc document, IMartenSession session, string tenant) + => new NumericClosedShapeUpsertOperation(document, Identity(document), tenant, _descriptor, OperationRole.Upsert, session.Versions.RevisionsFor()); + + public override IStorageOperation Overwrite(TDoc document, IMartenSession session, string tenant) + => new NumericClosedShapeOverwriteOperation(document, Identity(document), tenant, _descriptor, session.Versions.RevisionsFor()); + + public override IStorageOperation OverwriteProjected(TDoc document, string tenant) + => new NumericClosedShapeOverwriteOperation(document, Identity(document), tenant, _descriptor, null); + + public override ISelector BuildSelector(IMartenSession session) + => new NumericClosedShapeDirtyTrackingSelector(session, _descriptor); +} diff --git a/src/Marten/Internal/ClosedShape/NumericIdentityMapClosedShapeStorage.cs b/src/Marten/Internal/ClosedShape/NumericIdentityMapClosedShapeStorage.cs new file mode 100644 index 0000000000..cee6edebba --- /dev/null +++ b/src/Marten/Internal/ClosedShape/NumericIdentityMapClosedShapeStorage.cs @@ -0,0 +1,38 @@ +#nullable enable +using Marten.Internal; +using Marten.Internal.Operations; +using Marten.Linq.Selectors; +using Marten.Schema; + +namespace Marten.Internal.ClosedShape; + +/// +/// ConcurrencyMode.Numeric closed-shape IdentityMap storage. #4659 leaf. +/// +internal sealed class NumericIdentityMapClosedShapeStorage: IdentityMapClosedShapeStorage + where TDoc : notnull + where TId : notnull +{ + public NumericIdentityMapClosedShapeStorage(DocumentMapping mapping, DocumentStorageDescriptor descriptor) + : base(mapping, descriptor) + { + } + + public override IStorageOperation Insert(TDoc document, IMartenSession session, string tenant) + => new NumericClosedShapeInsertOperation(document, Identity(document), tenant, _descriptor, session.Versions.RevisionsFor()); + + public override IStorageOperation Update(TDoc document, IMartenSession session, string tenant) + => new NumericClosedShapeUpdateOperation(document, Identity(document), tenant, _descriptor, session.Versions.RevisionsFor()); + + public override IStorageOperation Upsert(TDoc document, IMartenSession session, string tenant) + => new NumericClosedShapeUpsertOperation(document, Identity(document), tenant, _descriptor, OperationRole.Upsert, session.Versions.RevisionsFor()); + + public override IStorageOperation Overwrite(TDoc document, IMartenSession session, string tenant) + => new NumericClosedShapeOverwriteOperation(document, Identity(document), tenant, _descriptor, session.Versions.RevisionsFor()); + + public override IStorageOperation OverwriteProjected(TDoc document, string tenant) + => new NumericClosedShapeOverwriteOperation(document, Identity(document), tenant, _descriptor, null); + + public override ISelector BuildSelector(IMartenSession session) + => new NumericClosedShapeIdentityMapSelector(session, _descriptor); +} diff --git a/src/Marten/Internal/ClosedShape/NumericLightweightClosedShapeStorage.cs b/src/Marten/Internal/ClosedShape/NumericLightweightClosedShapeStorage.cs new file mode 100644 index 0000000000..db65d2c22c --- /dev/null +++ b/src/Marten/Internal/ClosedShape/NumericLightweightClosedShapeStorage.cs @@ -0,0 +1,43 @@ +#nullable enable +using Marten.Internal; +using Marten.Internal.Operations; +using Marten.Linq.Selectors; +using Marten.Schema; + +namespace Marten.Internal.ClosedShape; + +/// +/// ConcurrencyMode.Numeric closed-shape Lightweight storage. +/// Factories construct Numeric… operation leaves and pass the +/// session's typed long revision tracker; BuildSelector returns the +/// numeric read selector. #4659 leaf. +/// +internal sealed class NumericLightweightClosedShapeStorage: LightweightClosedShapeStorage + where TDoc : notnull + where TId : notnull +{ + public NumericLightweightClosedShapeStorage(DocumentMapping mapping, DocumentStorageDescriptor descriptor) + : base(mapping, descriptor) + { + } + + public override IStorageOperation Insert(TDoc document, IMartenSession session, string tenant) + => new NumericClosedShapeInsertOperation(document, Identity(document), tenant, _descriptor, session.Versions.RevisionsFor()); + + public override IStorageOperation Update(TDoc document, IMartenSession session, string tenant) + => new NumericClosedShapeUpdateOperation(document, Identity(document), tenant, _descriptor, session.Versions.RevisionsFor()); + + public override IStorageOperation Upsert(TDoc document, IMartenSession session, string tenant) + => new NumericClosedShapeUpsertOperation(document, Identity(document), tenant, _descriptor, OperationRole.Upsert, session.Versions.RevisionsFor()); + + public override IStorageOperation Overwrite(TDoc document, IMartenSession session, string tenant) + => new NumericClosedShapeOverwriteOperation(document, Identity(document), tenant, _descriptor, session.Versions.RevisionsFor()); + + public override IStorageOperation OverwriteProjected(TDoc document, string tenant) + // #4658: projection path passes null tracker so it doesn't poison + // the session's numeric-revision map for the projected doc. + => new NumericClosedShapeOverwriteOperation(document, Identity(document), tenant, _descriptor, null); + + public override ISelector BuildSelector(IMartenSession session) + => new NumericClosedShapeLightweightSelector(session, _descriptor); +} diff --git a/src/Marten/Internal/ClosedShape/OptimisticClosedShapeDirtyTrackingSelector.cs b/src/Marten/Internal/ClosedShape/OptimisticClosedShapeDirtyTrackingSelector.cs new file mode 100644 index 0000000000..05b39f6c1f --- /dev/null +++ b/src/Marten/Internal/ClosedShape/OptimisticClosedShapeDirtyTrackingSelector.cs @@ -0,0 +1,34 @@ +#nullable enable +using System; +using System.Collections.Generic; +using System.Data.Common; + +namespace Marten.Internal.ClosedShape; + +/// +/// ConcurrencyMode.Optimistic DirtyTracking selector. +/// CaptureVersion writes the row's mt_version (Guid) into the session's +/// per-type version dict. #4659 leaf. +/// +internal sealed class OptimisticClosedShapeDirtyTrackingSelector: ClosedShapeDirtyTrackingSelector + where T : notnull + where TId : notnull +{ + private readonly Dictionary _versions; + + public OptimisticClosedShapeDirtyTrackingSelector(IMartenSession session, DocumentStorageDescriptor descriptor) + : base(session, descriptor) + { + _versions = session.Versions.ForType(); + } + + protected override void CaptureVersion(DbDataReader reader, TId id) + { + var versionIndex = _descriptor.VersionReadIndex; + if (versionIndex < 0) return; + var versionOrdinal = FirstMetadataColumn + versionIndex; + if (reader.IsDBNull(versionOrdinal)) return; + + _versions[id] = reader.GetFieldValue(versionOrdinal); + } +} diff --git a/src/Marten/Internal/ClosedShape/OptimisticClosedShapeIdentityMapSelector.cs b/src/Marten/Internal/ClosedShape/OptimisticClosedShapeIdentityMapSelector.cs new file mode 100644 index 0000000000..eac16edb9d --- /dev/null +++ b/src/Marten/Internal/ClosedShape/OptimisticClosedShapeIdentityMapSelector.cs @@ -0,0 +1,34 @@ +#nullable enable +using System; +using System.Collections.Generic; +using System.Data.Common; + +namespace Marten.Internal.ClosedShape; + +/// +/// ConcurrencyMode.Optimistic IdentityMap selector. CaptureVersion +/// writes the row's mt_version (Guid) into the session's per-type version +/// dict. #4659 leaf. +/// +internal sealed class OptimisticClosedShapeIdentityMapSelector: ClosedShapeIdentityMapSelector + where T : notnull + where TId : notnull +{ + private readonly Dictionary _versions; + + public OptimisticClosedShapeIdentityMapSelector(IMartenSession session, DocumentStorageDescriptor descriptor) + : base(session, descriptor) + { + _versions = session.Versions.ForType(); + } + + protected override void CaptureVersion(DbDataReader reader, TId id) + { + var versionIndex = _descriptor.VersionReadIndex; + if (versionIndex < 0) return; + var versionOrdinal = FirstMetadataColumn + versionIndex; + if (reader.IsDBNull(versionOrdinal)) return; + + _versions[id] = reader.GetFieldValue(versionOrdinal); + } +} diff --git a/src/Marten/Internal/ClosedShape/OptimisticClosedShapeInsertOperation.cs b/src/Marten/Internal/ClosedShape/OptimisticClosedShapeInsertOperation.cs new file mode 100644 index 0000000000..6ce0426f71 --- /dev/null +++ b/src/Marten/Internal/ClosedShape/OptimisticClosedShapeInsertOperation.cs @@ -0,0 +1,74 @@ +#nullable enable +using System; +using System.Collections.Generic; +using System.Data.Common; +using System.Threading; +using System.Threading.Tasks; +using JasperFx; +using JasperFx.Core; +using Marten.Exceptions; +using Npgsql; +using NpgsqlTypes; +using Weasel.Core; +using Weasel.Postgresql; + +namespace Marten.Internal.ClosedShape; + +/// +/// ConcurrencyMode.Optimistic closed-shape Insert. Generates a +/// fresh Guid version at construction time, binds it in place of the +/// VersionBinder's parameter slot, then writes it back through +/// + the +/// session's per-type version tracker on RETURNING success. #4659 leaf. +/// +internal sealed class OptimisticClosedShapeInsertOperation: ClosedShapeInsertOperation + where TDoc : notnull + where TId : notnull +{ + private readonly Dictionary _versions; + private readonly Guid _newVersion; + + public OptimisticClosedShapeInsertOperation( + TDoc document, + TId id, + string tenantId, + DocumentStorageDescriptor descriptor, + Dictionary versions) + : base(document, id, tenantId, descriptor) + { + _versions = versions; + _newVersion = CombGuidIdGeneration.NewGuid(); + } + + public override void ConfigureCommand(ICommandBuilder builder, IMartenSession session) + { + var parameters = builder.AppendWithParameters(_descriptor.InsertSql, '?'); + var slot = BindLeadingParameters(parameters, session); + + foreach (var binder in _descriptor.ClientSideWriteBinders) + { + if (ReferenceEquals(binder, _descriptor.VersionBinder)) + { + parameters[slot].Value = _newVersion; + parameters[slot].NpgsqlDbType = NpgsqlDbType.Uuid; + _descriptor.VersionBinder!.ApplyVersionTo(_document, _newVersion); + } + else + { + binder.BindParameter(parameters[slot], _document, session); + } + slot++; + } + } + + public override async Task PostprocessAsync(DbDataReader reader, IList exceptions, CancellationToken token) + { + if (!await reader.ReadAsync(token).ConfigureAwait(false)) + { + exceptions.Add(new DocumentAlreadyExistsException(null, typeof(TDoc), _id)); + return; + } + + _versions[_id] = _newVersion; + } +} diff --git a/src/Marten/Internal/ClosedShape/OptimisticClosedShapeLightweightSelector.cs b/src/Marten/Internal/ClosedShape/OptimisticClosedShapeLightweightSelector.cs new file mode 100644 index 0000000000..32df9c914b --- /dev/null +++ b/src/Marten/Internal/ClosedShape/OptimisticClosedShapeLightweightSelector.cs @@ -0,0 +1,34 @@ +#nullable enable +using System; +using System.Collections.Generic; +using System.Data.Common; + +namespace Marten.Internal.ClosedShape; + +/// +/// ConcurrencyMode.Optimistic Lightweight selector. Captures the +/// row's mt_version (Guid) into the session's per-type version dict so +/// subsequent updates can supply it as the expected version. #4659 leaf. +/// +internal sealed class OptimisticClosedShapeLightweightSelector: ClosedShapeLightweightSelector + where T : notnull + where TId : notnull +{ + private readonly Dictionary _versions; + + public OptimisticClosedShapeLightweightSelector(IMartenSession session, DocumentStorageDescriptor descriptor) + : base(session, descriptor) + { + _versions = session.Versions.ForType(); + } + + protected override void CaptureVersion(DbDataReader reader, TId id) + { + var versionIndex = _descriptor.VersionReadIndex; + if (versionIndex < 0) return; + var versionOrdinal = FirstMetadataColumn + versionIndex; + if (reader.IsDBNull(versionOrdinal)) return; + + _versions[id] = reader.GetFieldValue(versionOrdinal); + } +} diff --git a/src/Marten/Internal/ClosedShape/OptimisticClosedShapeOverwriteOperation.cs b/src/Marten/Internal/ClosedShape/OptimisticClosedShapeOverwriteOperation.cs new file mode 100644 index 0000000000..2dcfb7985d --- /dev/null +++ b/src/Marten/Internal/ClosedShape/OptimisticClosedShapeOverwriteOperation.cs @@ -0,0 +1,79 @@ +#nullable enable +using System; +using System.Collections.Generic; +using System.Data.Common; +using System.Threading; +using System.Threading.Tasks; +using JasperFx; +using JasperFx.Core; +using Npgsql; +using NpgsqlTypes; +using Weasel.Core; +using Weasel.Postgresql; + +namespace Marten.Internal.ClosedShape; + +/// +/// ConcurrencyMode.Optimistic closed-shape Overwrite. Generates a +/// fresh Guid version, binds it for the SET VersionBinder slot, no +/// trailing WHERE guard (Overwrite explicitly skips the version check). +/// PostprocessAsync writes the new version to the session tracker on +/// RETURNING success; a missing row only happens when the tracker dict +/// was supplied null (the projection / fire-and-forget path) — silent +/// no-op. #4659 leaf. +/// +internal sealed class OptimisticClosedShapeOverwriteOperation: ClosedShapeOverwriteOperation + where TDoc : notnull + where TId : notnull +{ + private readonly Dictionary? _versions; + private readonly Guid _newVersion; + + public OptimisticClosedShapeOverwriteOperation( + TDoc document, + TId id, + string tenantId, + DocumentStorageDescriptor descriptor, + Dictionary? versions) + : base(document, id, tenantId, descriptor) + { + _versions = versions; + _newVersion = CombGuidIdGeneration.NewGuid(); + } + + public override void ConfigureCommand(ICommandBuilder builder, IMartenSession session) + { + var parameters = builder.AppendWithParameters(_descriptor.OverwriteSql, '?'); + BindPreOnConflictParameters(parameters, session); + // Overwrite drops the trailing concurrency-guard slot vs Upsert. + } + + public override async Task PostprocessAsync(DbDataReader reader, IList exceptions, CancellationToken token) + { + if (await reader.ReadAsync(token).ConfigureAwait(false)) + { + // #4658 — null tracker (the OverwriteProjected path) just + // skips the tracker write. The fresh version is still + // applied to the document via VersionBinder during command + // configuration. + if (_versions is not null) + { + _versions[_id] = _newVersion; + } + } + } + + protected override int BindClientSideBinder(NpgsqlParameter[] parameters, int slot, IDocumentMetadataBinder binder, IMartenSession session) + { + if (ReferenceEquals(binder, _descriptor.VersionBinder)) + { + parameters[slot].Value = _newVersion; + parameters[slot].NpgsqlDbType = NpgsqlDbType.Uuid; + _descriptor.VersionBinder!.ApplyVersionTo(_document, _newVersion); + return slot + 1; + } + + binder.BindParameter(parameters[slot], _document, session); + return slot + 1; + } +} diff --git a/src/Marten/Internal/ClosedShape/OptimisticClosedShapeUpdateOperation.cs b/src/Marten/Internal/ClosedShape/OptimisticClosedShapeUpdateOperation.cs new file mode 100644 index 0000000000..cdaed99fd3 --- /dev/null +++ b/src/Marten/Internal/ClosedShape/OptimisticClosedShapeUpdateOperation.cs @@ -0,0 +1,88 @@ +#nullable enable +using System; +using System.Collections.Generic; +using System.Data.Common; +using System.Threading; +using System.Threading.Tasks; +using JasperFx; +using JasperFx.Core; +using Marten.Exceptions; +using Npgsql; +using NpgsqlTypes; +using Weasel.Core; +using Weasel.Postgresql; + +namespace Marten.Internal.ClosedShape; + +/// +/// ConcurrencyMode.Optimistic closed-shape Update. Generates a +/// fresh Guid version at construction time, binds it for the SET +/// VersionBinder slot, and binds the caller's expected Guid version +/// (from the session tracker, or DBNull if unknown) at the trailing +/// WHERE concurrency slot. Missing RETURNING row → version mismatch → +/// . #4659 leaf. +/// +internal sealed class OptimisticClosedShapeUpdateOperation: ClosedShapeUpdateOperation + where TDoc : notnull + where TId : notnull +{ + private readonly Dictionary _versions; + private readonly Guid _newVersion; + + public OptimisticClosedShapeUpdateOperation( + TDoc document, + TId id, + string tenantId, + DocumentStorageDescriptor descriptor, + Dictionary versions) + : base(document, id, tenantId, descriptor) + { + _versions = versions; + _newVersion = CombGuidIdGeneration.NewGuid(); + } + + public override void ConfigureCommand(ICommandBuilder builder, IMartenSession session) + { + var parameters = builder.AppendWithParameters(_descriptor.UpdateSql, '?'); + var slot = BindPreConcurrencyParameters(parameters, session); + + // Trailing WHERE mt_version = ? guard. + if (_versions.TryGetValue(_id, out var expected)) + { + parameters[slot].Value = expected; + } + else + { + parameters[slot].Value = DBNull.Value; + } + parameters[slot].NpgsqlDbType = NpgsqlDbType.Uuid; + } + + public override async Task PostprocessAsync(DbDataReader reader, IList exceptions, CancellationToken token) + { + if (!await reader.ReadAsync(token).ConfigureAwait(false)) + { + if (!IgnoreConcurrencyViolation) + { + exceptions.Add(new ConcurrencyException(typeof(TDoc), _id)); + } + return; + } + + _versions[_id] = _newVersion; + } + + protected override int BindClientSideBinder(NpgsqlParameter[] parameters, int slot, IDocumentMetadataBinder binder, IMartenSession session) + { + if (ReferenceEquals(binder, _descriptor.VersionBinder)) + { + parameters[slot].Value = _newVersion; + parameters[slot].NpgsqlDbType = NpgsqlDbType.Uuid; + _descriptor.VersionBinder!.ApplyVersionTo(_document, _newVersion); + return slot + 1; + } + + binder.BindParameter(parameters[slot], _document, session); + return slot + 1; + } +} diff --git a/src/Marten/Internal/ClosedShape/OptimisticClosedShapeUpsertOperation.cs b/src/Marten/Internal/ClosedShape/OptimisticClosedShapeUpsertOperation.cs new file mode 100644 index 0000000000..95accfa033 --- /dev/null +++ b/src/Marten/Internal/ClosedShape/OptimisticClosedShapeUpsertOperation.cs @@ -0,0 +1,90 @@ +#nullable enable +using System; +using System.Collections.Generic; +using System.Data.Common; +using System.Threading; +using System.Threading.Tasks; +using JasperFx; +using JasperFx.Core; +using Marten.Exceptions; +using Marten.Internal.Operations; +using Npgsql; +using NpgsqlTypes; +using Weasel.Core; +using Weasel.Postgresql; + +namespace Marten.Internal.ClosedShape; + +/// +/// ConcurrencyMode.Optimistic closed-shape Upsert. Generates a +/// fresh Guid version, binds it for the SET VersionBinder slot, and +/// binds the caller's expected Guid version at the trailing +/// ON CONFLICT DO UPDATE … WHERE table.mt_version = ? slot. +/// Missing RETURNING row → version mismatch → +/// . #4659 leaf. +/// +internal sealed class OptimisticClosedShapeUpsertOperation: ClosedShapeUpsertOperation + where TDoc : notnull + where TId : notnull +{ + private readonly Dictionary _versions; + private readonly Guid _newVersion; + + public OptimisticClosedShapeUpsertOperation( + TDoc document, + TId id, + string tenantId, + DocumentStorageDescriptor descriptor, + OperationRole role, + Dictionary versions) + : base(document, id, tenantId, descriptor, role) + { + _versions = versions; + _newVersion = CombGuidIdGeneration.NewGuid(); + } + + public override void ConfigureCommand(ICommandBuilder builder, IMartenSession session) + { + var parameters = builder.AppendWithParameters(_descriptor.UpsertSql, '?'); + var slot = BindPreOnConflictParameters(parameters, session); + + // Trailing WHERE table.mt_version = ? guard. + if (_versions.TryGetValue(_id, out var expected)) + { + parameters[slot].Value = expected; + } + else + { + parameters[slot].Value = DBNull.Value; + } + parameters[slot].NpgsqlDbType = NpgsqlDbType.Uuid; + } + + public override async Task PostprocessAsync(DbDataReader reader, IList exceptions, CancellationToken token) + { + if (!await reader.ReadAsync(token).ConfigureAwait(false)) + { + if (!IgnoreConcurrencyViolation) + { + exceptions.Add(new ConcurrencyException(typeof(TDoc), _id)); + } + return; + } + + _versions[_id] = _newVersion; + } + + protected override int BindClientSideBinder(NpgsqlParameter[] parameters, int slot, IDocumentMetadataBinder binder, IMartenSession session) + { + if (ReferenceEquals(binder, _descriptor.VersionBinder)) + { + parameters[slot].Value = _newVersion; + parameters[slot].NpgsqlDbType = NpgsqlDbType.Uuid; + _descriptor.VersionBinder!.ApplyVersionTo(_document, _newVersion); + return slot + 1; + } + + binder.BindParameter(parameters[slot], _document, session); + return slot + 1; + } +} diff --git a/src/Marten/Internal/ClosedShape/OptimisticDirtyCheckedClosedShapeStorage.cs b/src/Marten/Internal/ClosedShape/OptimisticDirtyCheckedClosedShapeStorage.cs new file mode 100644 index 0000000000..454a457051 --- /dev/null +++ b/src/Marten/Internal/ClosedShape/OptimisticDirtyCheckedClosedShapeStorage.cs @@ -0,0 +1,38 @@ +#nullable enable +using Marten.Internal; +using Marten.Internal.Operations; +using Marten.Linq.Selectors; +using Marten.Schema; + +namespace Marten.Internal.ClosedShape; + +/// +/// ConcurrencyMode.Optimistic closed-shape DirtyChecked storage. #4659 leaf. +/// +internal sealed class OptimisticDirtyCheckedClosedShapeStorage: DirtyCheckedClosedShapeStorage + where TDoc : notnull + where TId : notnull +{ + public OptimisticDirtyCheckedClosedShapeStorage(DocumentMapping mapping, DocumentStorageDescriptor descriptor) + : base(mapping, descriptor) + { + } + + public override IStorageOperation Insert(TDoc document, IMartenSession session, string tenant) + => new OptimisticClosedShapeInsertOperation(document, Identity(document), tenant, _descriptor, session.Versions.ForType()); + + public override IStorageOperation Update(TDoc document, IMartenSession session, string tenant) + => new OptimisticClosedShapeUpdateOperation(document, Identity(document), tenant, _descriptor, session.Versions.ForType()); + + public override IStorageOperation Upsert(TDoc document, IMartenSession session, string tenant) + => new OptimisticClosedShapeUpsertOperation(document, Identity(document), tenant, _descriptor, OperationRole.Upsert, session.Versions.ForType()); + + public override IStorageOperation Overwrite(TDoc document, IMartenSession session, string tenant) + => new OptimisticClosedShapeOverwriteOperation(document, Identity(document), tenant, _descriptor, session.Versions.ForType()); + + public override IStorageOperation OverwriteProjected(TDoc document, string tenant) + => new OptimisticClosedShapeOverwriteOperation(document, Identity(document), tenant, _descriptor, null); + + public override ISelector BuildSelector(IMartenSession session) + => new OptimisticClosedShapeDirtyTrackingSelector(session, _descriptor); +} diff --git a/src/Marten/Internal/ClosedShape/OptimisticIdentityMapClosedShapeStorage.cs b/src/Marten/Internal/ClosedShape/OptimisticIdentityMapClosedShapeStorage.cs new file mode 100644 index 0000000000..c6d81d7024 --- /dev/null +++ b/src/Marten/Internal/ClosedShape/OptimisticIdentityMapClosedShapeStorage.cs @@ -0,0 +1,38 @@ +#nullable enable +using Marten.Internal; +using Marten.Internal.Operations; +using Marten.Linq.Selectors; +using Marten.Schema; + +namespace Marten.Internal.ClosedShape; + +/// +/// ConcurrencyMode.Optimistic closed-shape IdentityMap storage. #4659 leaf. +/// +internal sealed class OptimisticIdentityMapClosedShapeStorage: IdentityMapClosedShapeStorage + where TDoc : notnull + where TId : notnull +{ + public OptimisticIdentityMapClosedShapeStorage(DocumentMapping mapping, DocumentStorageDescriptor descriptor) + : base(mapping, descriptor) + { + } + + public override IStorageOperation Insert(TDoc document, IMartenSession session, string tenant) + => new OptimisticClosedShapeInsertOperation(document, Identity(document), tenant, _descriptor, session.Versions.ForType()); + + public override IStorageOperation Update(TDoc document, IMartenSession session, string tenant) + => new OptimisticClosedShapeUpdateOperation(document, Identity(document), tenant, _descriptor, session.Versions.ForType()); + + public override IStorageOperation Upsert(TDoc document, IMartenSession session, string tenant) + => new OptimisticClosedShapeUpsertOperation(document, Identity(document), tenant, _descriptor, OperationRole.Upsert, session.Versions.ForType()); + + public override IStorageOperation Overwrite(TDoc document, IMartenSession session, string tenant) + => new OptimisticClosedShapeOverwriteOperation(document, Identity(document), tenant, _descriptor, session.Versions.ForType()); + + public override IStorageOperation OverwriteProjected(TDoc document, string tenant) + => new OptimisticClosedShapeOverwriteOperation(document, Identity(document), tenant, _descriptor, null); + + public override ISelector BuildSelector(IMartenSession session) + => new OptimisticClosedShapeIdentityMapSelector(session, _descriptor); +} diff --git a/src/Marten/Internal/ClosedShape/OptimisticLightweightClosedShapeStorage.cs b/src/Marten/Internal/ClosedShape/OptimisticLightweightClosedShapeStorage.cs new file mode 100644 index 0000000000..46699535ec --- /dev/null +++ b/src/Marten/Internal/ClosedShape/OptimisticLightweightClosedShapeStorage.cs @@ -0,0 +1,43 @@ +#nullable enable +using Marten.Internal; +using Marten.Internal.Operations; +using Marten.Linq.Selectors; +using Marten.Schema; + +namespace Marten.Internal.ClosedShape; + +/// +/// ConcurrencyMode.Optimistic closed-shape Lightweight storage. +/// Factories construct Optimistic… operation leaves and pass the +/// session's typed Guid version tracker; BuildSelector returns the +/// optimistic read selector. #4659 leaf. +/// +internal sealed class OptimisticLightweightClosedShapeStorage: LightweightClosedShapeStorage + where TDoc : notnull + where TId : notnull +{ + public OptimisticLightweightClosedShapeStorage(DocumentMapping mapping, DocumentStorageDescriptor descriptor) + : base(mapping, descriptor) + { + } + + public override IStorageOperation Insert(TDoc document, IMartenSession session, string tenant) + => new OptimisticClosedShapeInsertOperation(document, Identity(document), tenant, _descriptor, session.Versions.ForType()); + + public override IStorageOperation Update(TDoc document, IMartenSession session, string tenant) + => new OptimisticClosedShapeUpdateOperation(document, Identity(document), tenant, _descriptor, session.Versions.ForType()); + + public override IStorageOperation Upsert(TDoc document, IMartenSession session, string tenant) + => new OptimisticClosedShapeUpsertOperation(document, Identity(document), tenant, _descriptor, OperationRole.Upsert, session.Versions.ForType()); + + public override IStorageOperation Overwrite(TDoc document, IMartenSession session, string tenant) + => new OptimisticClosedShapeOverwriteOperation(document, Identity(document), tenant, _descriptor, session.Versions.ForType()); + + public override IStorageOperation OverwriteProjected(TDoc document, string tenant) + // #4658: projection path passes null tracker so it doesn't poison + // the session's optimistic-version map for the projected doc. + => new OptimisticClosedShapeOverwriteOperation(document, Identity(document), tenant, _descriptor, null); + + public override ISelector BuildSelector(IMartenSession session) + => new OptimisticClosedShapeLightweightSelector(session, _descriptor); +} diff --git a/src/Marten/Internal/ClosedShape/UnversionedClosedShapeDirtyTrackingSelector.cs b/src/Marten/Internal/ClosedShape/UnversionedClosedShapeDirtyTrackingSelector.cs new file mode 100644 index 0000000000..48cca8bd6f --- /dev/null +++ b/src/Marten/Internal/ClosedShape/UnversionedClosedShapeDirtyTrackingSelector.cs @@ -0,0 +1,20 @@ +#nullable enable +using System.Data.Common; + +namespace Marten.Internal.ClosedShape; + +/// +/// ConcurrencyMode.Off DirtyTracking selector. CaptureVersion is +/// a no-op. #4659 leaf. +/// +internal sealed class UnversionedClosedShapeDirtyTrackingSelector: ClosedShapeDirtyTrackingSelector + where T : notnull + where TId : notnull +{ + public UnversionedClosedShapeDirtyTrackingSelector(IMartenSession session, DocumentStorageDescriptor descriptor) + : base(session, descriptor) + { + } + + protected override void CaptureVersion(DbDataReader reader, TId id) { /* no-op */ } +} diff --git a/src/Marten/Internal/ClosedShape/UnversionedClosedShapeIdentityMapSelector.cs b/src/Marten/Internal/ClosedShape/UnversionedClosedShapeIdentityMapSelector.cs new file mode 100644 index 0000000000..39c7259924 --- /dev/null +++ b/src/Marten/Internal/ClosedShape/UnversionedClosedShapeIdentityMapSelector.cs @@ -0,0 +1,20 @@ +#nullable enable +using System.Data.Common; + +namespace Marten.Internal.ClosedShape; + +/// +/// ConcurrencyMode.Off IdentityMap selector. CaptureVersion is a +/// no-op. #4659 leaf. +/// +internal sealed class UnversionedClosedShapeIdentityMapSelector: ClosedShapeIdentityMapSelector + where T : notnull + where TId : notnull +{ + public UnversionedClosedShapeIdentityMapSelector(IMartenSession session, DocumentStorageDescriptor descriptor) + : base(session, descriptor) + { + } + + protected override void CaptureVersion(DbDataReader reader, TId id) { /* no-op */ } +} diff --git a/src/Marten/Internal/ClosedShape/UnversionedClosedShapeInsertOperation.cs b/src/Marten/Internal/ClosedShape/UnversionedClosedShapeInsertOperation.cs new file mode 100644 index 0000000000..d33af9881d --- /dev/null +++ b/src/Marten/Internal/ClosedShape/UnversionedClosedShapeInsertOperation.cs @@ -0,0 +1,56 @@ +#nullable enable +using System; +using System.Collections.Generic; +using System.Data.Common; +using System.Threading; +using System.Threading.Tasks; +using JasperFx; +using Marten.Exceptions; +using Npgsql; +using Weasel.Core; +using Weasel.Postgresql; + +namespace Marten.Internal.ClosedShape; + +/// +/// ConcurrencyMode.Off closed-shape Insert. Hand-written for the +/// no-version-no-revision shape: bind [tenant_id,] id, data + the +/// client-side metadata binders, then on RETURNING expect a row (otherwise +/// raise ). No tracker writes, +/// no version/revision binder special-casing. #4659 leaf. +/// +internal sealed class UnversionedClosedShapeInsertOperation: ClosedShapeInsertOperation + where TDoc : notnull + where TId : notnull +{ + public UnversionedClosedShapeInsertOperation( + TDoc document, + TId id, + string tenantId, + DocumentStorageDescriptor descriptor) + : base(document, id, tenantId, descriptor) + { + } + + public override void ConfigureCommand(ICommandBuilder builder, IMartenSession session) + { + var parameters = builder.AppendWithParameters(_descriptor.InsertSql, '?'); + var slot = BindLeadingParameters(parameters, session); + + // Off mode: every client-side write binder binds its single slot. + // No version or revision binder is in this array under Off mode. + foreach (var binder in _descriptor.ClientSideWriteBinders) + { + binder.BindParameter(parameters[slot], _document, session); + slot++; + } + } + + public override async Task PostprocessAsync(DbDataReader reader, IList exceptions, CancellationToken token) + { + if (!await reader.ReadAsync(token).ConfigureAwait(false)) + { + exceptions.Add(new DocumentAlreadyExistsException(null, typeof(TDoc), _id)); + } + } +} diff --git a/src/Marten/Internal/ClosedShape/UnversionedClosedShapeLightweightSelector.cs b/src/Marten/Internal/ClosedShape/UnversionedClosedShapeLightweightSelector.cs new file mode 100644 index 0000000000..99e98515a9 --- /dev/null +++ b/src/Marten/Internal/ClosedShape/UnversionedClosedShapeLightweightSelector.cs @@ -0,0 +1,25 @@ +#nullable enable +using System.Data.Common; + +namespace Marten.Internal.ClosedShape; + +/// +/// ConcurrencyMode.Off Lightweight selector. No version/revision +/// tracker — CaptureVersion is a no-op. #4659 leaf. +/// +internal sealed class UnversionedClosedShapeLightweightSelector: ClosedShapeLightweightSelector + where T : notnull + where TId : notnull +{ + public UnversionedClosedShapeLightweightSelector(IMartenSession session, DocumentStorageDescriptor descriptor) + : base(session, descriptor) + { + } + + protected override void CaptureVersion(DbDataReader reader, TId id) + { + // Off-mode: nothing to capture. The mt_version column may or may + // not be present in ReadBinders (depends on the user's mapping) + // but the session has no tracker to push it into. + } +} diff --git a/src/Marten/Internal/ClosedShape/UnversionedClosedShapeOverwriteOperation.cs b/src/Marten/Internal/ClosedShape/UnversionedClosedShapeOverwriteOperation.cs new file mode 100644 index 0000000000..f2c28cfe1c --- /dev/null +++ b/src/Marten/Internal/ClosedShape/UnversionedClosedShapeOverwriteOperation.cs @@ -0,0 +1,47 @@ +#nullable enable +using System; +using System.Collections.Generic; +using System.Data.Common; +using System.Threading; +using System.Threading.Tasks; +using Npgsql; +using Weasel.Core; +using Weasel.Postgresql; + +namespace Marten.Internal.ClosedShape; + +/// +/// ConcurrencyMode.Off closed-shape Overwrite. Functionally +/// identical to +/// — no concurrency guard either way under Off. PostprocessAsync is +/// fire-and-forget. #4659 leaf. +/// +internal sealed class UnversionedClosedShapeOverwriteOperation: ClosedShapeOverwriteOperation + where TDoc : notnull + where TId : notnull +{ + public UnversionedClosedShapeOverwriteOperation( + TDoc document, + TId id, + string tenantId, + DocumentStorageDescriptor descriptor) + : base(document, id, tenantId, descriptor) + { + } + + public override void ConfigureCommand(ICommandBuilder builder, IMartenSession session) + { + var parameters = builder.AppendWithParameters(_descriptor.OverwriteSql, '?'); + BindPreOnConflictParameters(parameters, session); + // Off-mode OverwriteSql has no trailing concurrency-extras slot. + } + + public override Task PostprocessAsync(DbDataReader reader, IList exceptions, CancellationToken token) + => Task.CompletedTask; + + protected override int BindClientSideBinder(NpgsqlParameter[] parameters, int slot, IDocumentMetadataBinder binder, IMartenSession session) + { + binder.BindParameter(parameters[slot], _document, session); + return slot + 1; + } +} diff --git a/src/Marten/Internal/ClosedShape/UnversionedClosedShapeUpdateOperation.cs b/src/Marten/Internal/ClosedShape/UnversionedClosedShapeUpdateOperation.cs new file mode 100644 index 0000000000..aab96d49b0 --- /dev/null +++ b/src/Marten/Internal/ClosedShape/UnversionedClosedShapeUpdateOperation.cs @@ -0,0 +1,57 @@ +#nullable enable +using System; +using System.Collections.Generic; +using System.Data.Common; +using System.Threading; +using System.Threading.Tasks; +using Marten.Exceptions; +using Npgsql; +using Weasel.Core; +using Weasel.Postgresql; + +namespace Marten.Internal.ClosedShape; + +/// +/// ConcurrencyMode.Off closed-shape Update. Plain +/// WHERE id = ? [AND tenant_id = ?] [AND partition_col = ?…] — +/// no concurrency guard. A missing row signals the document didn't +/// exist; raises . #4659 leaf. +/// +internal sealed class UnversionedClosedShapeUpdateOperation: ClosedShapeUpdateOperation + where TDoc : notnull + where TId : notnull +{ + public UnversionedClosedShapeUpdateOperation( + TDoc document, + TId id, + string tenantId, + DocumentStorageDescriptor descriptor) + : base(document, id, tenantId, descriptor) + { + } + + public override void ConfigureCommand(ICommandBuilder builder, IMartenSession session) + { + var parameters = builder.AppendWithParameters(_descriptor.UpdateSql, '?'); + // Off mode has no trailing concurrency-guard slot — we just consume + // data + binders + id + tenant + partition PK. + BindPreConcurrencyParameters(parameters, session); + } + + public override async Task PostprocessAsync(DbDataReader reader, IList exceptions, CancellationToken token) + { + if (!await reader.ReadAsync(token).ConfigureAwait(false)) + { + if (!IgnoreConcurrencyViolation) + { + exceptions.Add(new NonExistentDocumentException(typeof(TDoc), _id)); + } + } + } + + protected override int BindClientSideBinder(NpgsqlParameter[] parameters, int slot, IDocumentMetadataBinder binder, IMartenSession session) + { + binder.BindParameter(parameters[slot], _document, session); + return slot + 1; + } +} diff --git a/src/Marten/Internal/ClosedShape/UnversionedClosedShapeUpsertOperation.cs b/src/Marten/Internal/ClosedShape/UnversionedClosedShapeUpsertOperation.cs new file mode 100644 index 0000000000..84d704d3fb --- /dev/null +++ b/src/Marten/Internal/ClosedShape/UnversionedClosedShapeUpsertOperation.cs @@ -0,0 +1,51 @@ +#nullable enable +using System; +using System.Collections.Generic; +using System.Data.Common; +using System.Threading; +using System.Threading.Tasks; +using Marten.Internal.Operations; +using Npgsql; +using Weasel.Core; +using Weasel.Postgresql; + +namespace Marten.Internal.ClosedShape; + +/// +/// ConcurrencyMode.Off closed-shape Upsert. Plain +/// INSERT … ON CONFLICT (id) DO UPDATE SET … RETURNING id — no +/// version/revision guard. PostprocessAsync is fire-and-forget: the +/// RETURNING row is there for symmetry with Insert/Update but the result +/// isn't inspected (Off-mode Upsert always wins, no concurrency-failure +/// outcome to detect). #4659 leaf. +/// +internal sealed class UnversionedClosedShapeUpsertOperation: ClosedShapeUpsertOperation + where TDoc : notnull + where TId : notnull +{ + public UnversionedClosedShapeUpsertOperation( + TDoc document, + TId id, + string tenantId, + DocumentStorageDescriptor descriptor, + OperationRole role) + : base(document, id, tenantId, descriptor, role) + { + } + + public override void ConfigureCommand(ICommandBuilder builder, IMartenSession session) + { + var parameters = builder.AppendWithParameters(_descriptor.UpsertSql, '?'); + BindPreOnConflictParameters(parameters, session); + // Off-mode UpsertSql has no trailing concurrency-extras slot. + } + + public override Task PostprocessAsync(DbDataReader reader, IList exceptions, CancellationToken token) + => Task.CompletedTask; + + protected override int BindClientSideBinder(NpgsqlParameter[] parameters, int slot, IDocumentMetadataBinder binder, IMartenSession session) + { + binder.BindParameter(parameters[slot], _document, session); + return slot + 1; + } +} diff --git a/src/Marten/Internal/ClosedShape/UnversionedDirtyCheckedClosedShapeStorage.cs b/src/Marten/Internal/ClosedShape/UnversionedDirtyCheckedClosedShapeStorage.cs new file mode 100644 index 0000000000..19c5c55721 --- /dev/null +++ b/src/Marten/Internal/ClosedShape/UnversionedDirtyCheckedClosedShapeStorage.cs @@ -0,0 +1,38 @@ +#nullable enable +using Marten.Internal; +using Marten.Internal.Operations; +using Marten.Linq.Selectors; +using Marten.Schema; + +namespace Marten.Internal.ClosedShape; + +/// +/// ConcurrencyMode.Off closed-shape DirtyChecked storage. #4659 leaf. +/// +internal sealed class UnversionedDirtyCheckedClosedShapeStorage: DirtyCheckedClosedShapeStorage + where TDoc : notnull + where TId : notnull +{ + public UnversionedDirtyCheckedClosedShapeStorage(DocumentMapping mapping, DocumentStorageDescriptor descriptor) + : base(mapping, descriptor) + { + } + + public override IStorageOperation Insert(TDoc document, IMartenSession session, string tenant) + => new UnversionedClosedShapeInsertOperation(document, Identity(document), tenant, _descriptor); + + public override IStorageOperation Update(TDoc document, IMartenSession session, string tenant) + => new UnversionedClosedShapeUpdateOperation(document, Identity(document), tenant, _descriptor); + + public override IStorageOperation Upsert(TDoc document, IMartenSession session, string tenant) + => new UnversionedClosedShapeUpsertOperation(document, Identity(document), tenant, _descriptor, OperationRole.Upsert); + + public override IStorageOperation Overwrite(TDoc document, IMartenSession session, string tenant) + => new UnversionedClosedShapeOverwriteOperation(document, Identity(document), tenant, _descriptor); + + public override IStorageOperation OverwriteProjected(TDoc document, string tenant) + => new UnversionedClosedShapeOverwriteOperation(document, Identity(document), tenant, _descriptor); + + public override ISelector BuildSelector(IMartenSession session) + => new UnversionedClosedShapeDirtyTrackingSelector(session, _descriptor); +} diff --git a/src/Marten/Internal/ClosedShape/UnversionedIdentityMapClosedShapeStorage.cs b/src/Marten/Internal/ClosedShape/UnversionedIdentityMapClosedShapeStorage.cs new file mode 100644 index 0000000000..493d6f5126 --- /dev/null +++ b/src/Marten/Internal/ClosedShape/UnversionedIdentityMapClosedShapeStorage.cs @@ -0,0 +1,38 @@ +#nullable enable +using Marten.Internal; +using Marten.Internal.Operations; +using Marten.Linq.Selectors; +using Marten.Schema; + +namespace Marten.Internal.ClosedShape; + +/// +/// ConcurrencyMode.Off closed-shape IdentityMap storage. #4659 leaf. +/// +internal sealed class UnversionedIdentityMapClosedShapeStorage: IdentityMapClosedShapeStorage + where TDoc : notnull + where TId : notnull +{ + public UnversionedIdentityMapClosedShapeStorage(DocumentMapping mapping, DocumentStorageDescriptor descriptor) + : base(mapping, descriptor) + { + } + + public override IStorageOperation Insert(TDoc document, IMartenSession session, string tenant) + => new UnversionedClosedShapeInsertOperation(document, Identity(document), tenant, _descriptor); + + public override IStorageOperation Update(TDoc document, IMartenSession session, string tenant) + => new UnversionedClosedShapeUpdateOperation(document, Identity(document), tenant, _descriptor); + + public override IStorageOperation Upsert(TDoc document, IMartenSession session, string tenant) + => new UnversionedClosedShapeUpsertOperation(document, Identity(document), tenant, _descriptor, OperationRole.Upsert); + + public override IStorageOperation Overwrite(TDoc document, IMartenSession session, string tenant) + => new UnversionedClosedShapeOverwriteOperation(document, Identity(document), tenant, _descriptor); + + public override IStorageOperation OverwriteProjected(TDoc document, string tenant) + => new UnversionedClosedShapeOverwriteOperation(document, Identity(document), tenant, _descriptor); + + public override ISelector BuildSelector(IMartenSession session) + => new UnversionedClosedShapeIdentityMapSelector(session, _descriptor); +} diff --git a/src/Marten/Internal/ClosedShape/UnversionedLightweightClosedShapeStorage.cs b/src/Marten/Internal/ClosedShape/UnversionedLightweightClosedShapeStorage.cs new file mode 100644 index 0000000000..becd8bb0aa --- /dev/null +++ b/src/Marten/Internal/ClosedShape/UnversionedLightweightClosedShapeStorage.cs @@ -0,0 +1,41 @@ +#nullable enable +using Marten.Internal; +using Marten.Internal.Operations; +using Marten.Linq.Selectors; +using Marten.Schema; + +namespace Marten.Internal.ClosedShape; + +/// +/// ConcurrencyMode.Off closed-shape Lightweight storage. No +/// version / revision tracker plumbed — each write factory constructs +/// its Unversioned… operation leaf and BuildSelector returns the +/// unversioned read selector. #4659 leaf. +/// +internal sealed class UnversionedLightweightClosedShapeStorage: LightweightClosedShapeStorage + where TDoc : notnull + where TId : notnull +{ + public UnversionedLightweightClosedShapeStorage(DocumentMapping mapping, DocumentStorageDescriptor descriptor) + : base(mapping, descriptor) + { + } + + public override IStorageOperation Insert(TDoc document, IMartenSession session, string tenant) + => new UnversionedClosedShapeInsertOperation(document, Identity(document), tenant, _descriptor); + + public override IStorageOperation Update(TDoc document, IMartenSession session, string tenant) + => new UnversionedClosedShapeUpdateOperation(document, Identity(document), tenant, _descriptor); + + public override IStorageOperation Upsert(TDoc document, IMartenSession session, string tenant) + => new UnversionedClosedShapeUpsertOperation(document, Identity(document), tenant, _descriptor, OperationRole.Upsert); + + public override IStorageOperation Overwrite(TDoc document, IMartenSession session, string tenant) + => new UnversionedClosedShapeOverwriteOperation(document, Identity(document), tenant, _descriptor); + + public override IStorageOperation OverwriteProjected(TDoc document, string tenant) + => new UnversionedClosedShapeOverwriteOperation(document, Identity(document), tenant, _descriptor); + + public override ISelector BuildSelector(IMartenSession session) + => new UnversionedClosedShapeLightweightSelector(session, _descriptor); +}