Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions src/Marten/Events/Fetching/FetchAsyncPlan.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
using Marten.Internal.Storage;
using Marten.Linq.QueryHandlers;
using Marten.Schema;
using Marten.Storage;
using Npgsql;
using Weasel.Core;
using Weasel.Postgresql;
Expand Down Expand Up @@ -73,8 +74,18 @@ public FetchAsyncPlan(EventGraph events, IEventIdentityStrategy<TId> identityStr
_storage = storage;
_aggregator = _events.Options.Projections.AggregatorFor<TDoc>();

_versionSelectionSql =
$" left outer join {storage.TableName.QualifiedName} as a on d.stream_id = a.id where (a.mt_version is NULL or d.version > a.mt_version) and d.stream_id = ";
if (_events.TenancyStyle == TenancyStyle.Single)
{
_versionSelectionSql =
$" left outer join {storage.TableName.QualifiedName} as a on d.stream_id = a.id where (a.mt_version is NULL or d.version > a.mt_version) and d.stream_id = ";
}
else
{
_versionSelectionSql =
$" left outer join {storage.TableName.QualifiedName} as a on d.stream_id = a.id and d.tenant_id = a.tenant_id where (a.mt_version is NULL or d.version > a.mt_version) and d.stream_id = ";
}


}

public async Task<IEventStream<TDoc>> FetchForWriting(DocumentSessionBase session, TId id, bool forUpdate, CancellationToken cancellation = default)
Expand Down Expand Up @@ -177,6 +188,14 @@ private void writeEventFetchStatement(TId id,
builder.Append(_initialSql);
builder.Append(_versionSelectionSql);
builder.AppendParameter(id);

// You must do this for performance even if the stream ids were
// magically unique across tenants
if (_events.TenancyStyle == TenancyStyle.Conjoined)
{
builder.Append(" and d.tenant_id = ");
builder.AppendParameter(builder.TenantId);
}
}

public async Task<IEventStream<TDoc>> FetchForWriting(DocumentSessionBase session, TId id, long expectedStartingVersion,
Expand Down
Loading