diff --git a/src/Akka.Persistence.Sql.Tests.Common/Containers/SqlServer2016Container.cs b/src/Akka.Persistence.Sql.Tests.Common/Containers/SqlServer2016Container.cs
new file mode 100644
index 00000000..d09ab0a1
--- /dev/null
+++ b/src/Akka.Persistence.Sql.Tests.Common/Containers/SqlServer2016Container.cs
@@ -0,0 +1,108 @@
+// -----------------------------------------------------------------------
+//
+// Copyright (C) 2013-2023 .NET Foundation
+//
+// -----------------------------------------------------------------------
+
+using System;
+using System.Collections.Generic;
+using System.Data.Common;
+using System.Threading.Tasks;
+using Akka.Util;
+using Docker.DotNet.Models;
+using Microsoft.Data.SqlClient;
+
+namespace Akka.Persistence.Sql.Tests.Common.Containers
+{
+ ///
+ /// Runs a SQL Server 2022 container but sets the database compatibility level to 130
+ /// (SQL Server 2016) so that LinqToDB auto-detects it as SQL Server 2016.
+ /// This means STRING_AGG is not available, forcing the client-side tag aggregation fallback.
+ /// Uses the generic "SqlServer" provider name to trigger LinqToDB auto-detection.
+ ///
+ public sealed class SqlServer2016Container : DockerContainer
+ {
+ private const string User = "sa";
+
+ private const string Password = "Password12!";
+
+ private readonly DbConnectionStringBuilder _connectionStringBuilder;
+
+ public SqlServer2016Container() : base("mcr.microsoft.com/mssql/server", "2022-latest", $"mssql2016-{Guid.NewGuid():N}")
+ => _connectionStringBuilder = new DbConnectionStringBuilder
+ {
+ ["Server"] = $"localhost,{Port}",
+ ["User Id"] = User,
+ ["Password"] = Password,
+ ["TrustServerCertificate"] = "true",
+ };
+
+ public override string ConnectionString => _connectionStringBuilder.ToString();
+
+ // Use generic "SqlServer" so LinqToDB auto-detects version from compatibility level
+ public override string ProviderName => LinqToDB.ProviderName.SqlServer;
+
+ private int Port { get; } = ThreadLocalRandom.Current.Next(9000, 10000);
+
+ protected override string ReadyMarker => "Recovery is complete.";
+
+ protected override void GenerateDatabaseName()
+ {
+ base.GenerateDatabaseName();
+
+ _connectionStringBuilder["Database"] = DatabaseName;
+ }
+
+ protected override void ConfigureContainer(CreateContainerParameters parameters)
+ {
+ parameters.ExposedPorts = new Dictionary
+ {
+ ["1433/tcp"] = new(),
+ };
+
+ parameters.HostConfig = new HostConfig
+ {
+ PortBindings = new Dictionary>
+ {
+ ["1433/tcp"] = new List { new() { HostPort = $"{Port}" } },
+ },
+ };
+
+ parameters.Env = new[]
+ {
+ "ACCEPT_EULA=Y",
+ $"MSSQL_SA_PASSWORD={Password}",
+ "MSSQL_PID=Express",
+ };
+ }
+
+ public override async Task InitializeDbAsync()
+ {
+ _connectionStringBuilder["Database"] = "master";
+
+ await using var connection = new SqlConnection(ConnectionString);
+ await connection.OpenAsync();
+
+ GenerateDatabaseName();
+
+ await using var createCommand = new SqlCommand
+ {
+ CommandText = $"CREATE DATABASE [{DatabaseName}]",
+ Connection = connection,
+ };
+ await createCommand.ExecuteNonQueryAsync();
+
+ // Set compatibility level to 130 (SQL Server 2016)
+ // This causes LinqToDB to auto-detect the version as SQL Server 2016,
+ // which does not support STRING_AGG
+ await using var compatCommand = new SqlCommand
+ {
+ CommandText = $"ALTER DATABASE [{DatabaseName}] SET COMPATIBILITY_LEVEL = 130",
+ Connection = connection,
+ };
+ await compatCommand.ExecuteNonQueryAsync();
+
+ await connection.CloseAsync();
+ }
+ }
+}
diff --git a/src/Akka.Persistence.Sql.Tests/Query/SqlServer2016/TagTable/SqlServer2016AllEventsSpec.cs b/src/Akka.Persistence.Sql.Tests/Query/SqlServer2016/TagTable/SqlServer2016AllEventsSpec.cs
new file mode 100644
index 00000000..fc57c345
--- /dev/null
+++ b/src/Akka.Persistence.Sql.Tests/Query/SqlServer2016/TagTable/SqlServer2016AllEventsSpec.cs
@@ -0,0 +1,28 @@
+// -----------------------------------------------------------------------
+//
+// Copyright (C) 2013-2023 .NET Foundation
+//
+// -----------------------------------------------------------------------
+
+using Akka.Persistence.Sql.Config;
+using Akka.Persistence.Sql.Tests.Common.Containers;
+using Akka.Persistence.Sql.Tests.Common.Query;
+using Akka.Persistence.Sql.Tests.SqlServer;
+using Xunit;
+using Xunit.Abstractions;
+#if !DEBUG
+using Akka.Persistence.Sql.Tests.Common.Internal.Xunit;
+#endif
+
+namespace Akka.Persistence.Sql.Tests.Query.SqlServer2016.TagTable
+{
+#if !DEBUG
+ [SkipWindows]
+#endif
+ [Collection(nameof(SqlServer2016PersistenceSpec))]
+ public class SqlServer2016AllEventsSpec : BaseAllEventsSpec
+ {
+ public SqlServer2016AllEventsSpec(ITestOutputHelper output, SqlServer2016Container fixture)
+ : base(TagMode.TagTable, output, nameof(SqlServer2016AllEventsSpec), fixture) { }
+ }
+}
diff --git a/src/Akka.Persistence.Sql.Tests/Query/SqlServer2016/TagTable/SqlServer2016CurrentAllEventsSpec.cs b/src/Akka.Persistence.Sql.Tests/Query/SqlServer2016/TagTable/SqlServer2016CurrentAllEventsSpec.cs
new file mode 100644
index 00000000..d30bbeb8
--- /dev/null
+++ b/src/Akka.Persistence.Sql.Tests/Query/SqlServer2016/TagTable/SqlServer2016CurrentAllEventsSpec.cs
@@ -0,0 +1,28 @@
+// -----------------------------------------------------------------------
+//
+// Copyright (C) 2013-2023 .NET Foundation
+//
+// -----------------------------------------------------------------------
+
+using Akka.Persistence.Sql.Config;
+using Akka.Persistence.Sql.Tests.Common.Containers;
+using Akka.Persistence.Sql.Tests.Common.Query;
+using Akka.Persistence.Sql.Tests.SqlServer;
+using Xunit;
+using Xunit.Abstractions;
+#if !DEBUG
+using Akka.Persistence.Sql.Tests.Common.Internal.Xunit;
+#endif
+
+namespace Akka.Persistence.Sql.Tests.Query.SqlServer2016.TagTable
+{
+#if !DEBUG
+ [SkipWindows]
+#endif
+ [Collection(nameof(SqlServer2016PersistenceSpec))]
+ public class SqlServer2016CurrentAllEventsSpec : BaseCurrentAllEventsSpec
+ {
+ public SqlServer2016CurrentAllEventsSpec(ITestOutputHelper output, SqlServer2016Container fixture)
+ : base(TagMode.TagTable, output, nameof(SqlServer2016CurrentAllEventsSpec), fixture) { }
+ }
+}
diff --git a/src/Akka.Persistence.Sql.Tests/Query/SqlServer2016/TagTable/SqlServer2016CurrentEventsByTagSpec.cs b/src/Akka.Persistence.Sql.Tests/Query/SqlServer2016/TagTable/SqlServer2016CurrentEventsByTagSpec.cs
new file mode 100644
index 00000000..864384a5
--- /dev/null
+++ b/src/Akka.Persistence.Sql.Tests/Query/SqlServer2016/TagTable/SqlServer2016CurrentEventsByTagSpec.cs
@@ -0,0 +1,28 @@
+// -----------------------------------------------------------------------
+//
+// Copyright (C) 2013-2023 .NET Foundation
+//
+// -----------------------------------------------------------------------
+
+using Akka.Persistence.Sql.Config;
+using Akka.Persistence.Sql.Tests.Common.Containers;
+using Akka.Persistence.Sql.Tests.Common.Query;
+using Akka.Persistence.Sql.Tests.SqlServer;
+using Xunit;
+using Xunit.Abstractions;
+#if !DEBUG
+using Akka.Persistence.Sql.Tests.Common.Internal.Xunit;
+#endif
+
+namespace Akka.Persistence.Sql.Tests.Query.SqlServer2016.TagTable
+{
+#if !DEBUG
+ [SkipWindows]
+#endif
+ [Collection(nameof(SqlServer2016PersistenceSpec))]
+ public class SqlServer2016CurrentEventsByTagSpec : BaseCurrentEventsByTagSpec
+ {
+ public SqlServer2016CurrentEventsByTagSpec(ITestOutputHelper output, SqlServer2016Container fixture)
+ : base(TagMode.TagTable, output, nameof(SqlServer2016CurrentEventsByTagSpec), fixture) { }
+ }
+}
diff --git a/src/Akka.Persistence.Sql.Tests/Query/SqlServer2016/TagTable/SqlServer2016EventsByPersistenceIdSpec.cs b/src/Akka.Persistence.Sql.Tests/Query/SqlServer2016/TagTable/SqlServer2016EventsByPersistenceIdSpec.cs
new file mode 100644
index 00000000..f8daeeb4
--- /dev/null
+++ b/src/Akka.Persistence.Sql.Tests/Query/SqlServer2016/TagTable/SqlServer2016EventsByPersistenceIdSpec.cs
@@ -0,0 +1,28 @@
+// -----------------------------------------------------------------------
+//
+// Copyright (C) 2013-2023 .NET Foundation
+//
+// -----------------------------------------------------------------------
+
+using Akka.Persistence.Sql.Config;
+using Akka.Persistence.Sql.Tests.Common.Containers;
+using Akka.Persistence.Sql.Tests.Common.Query;
+using Akka.Persistence.Sql.Tests.SqlServer;
+using Xunit;
+using Xunit.Abstractions;
+#if !DEBUG
+using Akka.Persistence.Sql.Tests.Common.Internal.Xunit;
+#endif
+
+namespace Akka.Persistence.Sql.Tests.Query.SqlServer2016.TagTable
+{
+#if !DEBUG
+ [SkipWindows]
+#endif
+ [Collection(nameof(SqlServer2016PersistenceSpec))]
+ public class SqlServer2016EventsByPersistenceIdSpec : BaseEventsByPersistenceIdSpec
+ {
+ public SqlServer2016EventsByPersistenceIdSpec(ITestOutputHelper output, SqlServer2016Container fixture)
+ : base(TagMode.TagTable, output, nameof(SqlServer2016EventsByPersistenceIdSpec), fixture) { }
+ }
+}
diff --git a/src/Akka.Persistence.Sql.Tests/Query/SqlServer2016/TagTable/SqlServer2016EventsByTagSpec.cs b/src/Akka.Persistence.Sql.Tests/Query/SqlServer2016/TagTable/SqlServer2016EventsByTagSpec.cs
new file mode 100644
index 00000000..56991f3a
--- /dev/null
+++ b/src/Akka.Persistence.Sql.Tests/Query/SqlServer2016/TagTable/SqlServer2016EventsByTagSpec.cs
@@ -0,0 +1,84 @@
+// -----------------------------------------------------------------------
+//
+// Copyright (C) 2013-2023 .NET Foundation
+//
+// -----------------------------------------------------------------------
+
+using System;
+using System.Linq;
+using System.Threading.Tasks;
+using Akka.Persistence.Sql.Config;
+using Akka.Persistence.Sql.Db;
+using Akka.Persistence.Sql.Journal.Types;
+using Akka.Persistence.Sql.Tests.Common.Containers;
+using Akka.Persistence.Sql.Tests.Common.Query;
+using Akka.Persistence.Sql.Tests.SqlServer;
+using FluentAssertions;
+using LinqToDB;
+using LinqToDB.Data;
+using LinqToDB.DataProvider.SqlServer;
+using Xunit;
+using Xunit.Abstractions;
+#if !DEBUG
+using Akka.Persistence.Sql.Tests.Common.Internal.Xunit;
+#endif
+
+namespace Akka.Persistence.Sql.Tests.Query.SqlServer2016.TagTable
+{
+#if !DEBUG
+ [SkipWindows]
+#endif
+ [Collection(nameof(SqlServer2016PersistenceSpec))]
+ public class SqlServer2016EventsByTagSpec : BaseEventsByTagSpec
+ {
+ private readonly SqlServer2016Container _fixture;
+
+ public SqlServer2016EventsByTagSpec(ITestOutputHelper output, SqlServer2016Container fixture)
+ : base(TagMode.TagTable, output, nameof(SqlServer2016EventsByTagSpec), fixture)
+ {
+ _fixture = fixture;
+ }
+
+ [Fact(DisplayName = "StringAggregate should throw on SQL Server 2016 compatibility level")]
+ public async Task StringAggregateShouldThrowOnSqlServer2016()
+ {
+ // Arrange: connect to the database using the generic "SqlServer" provider
+ // LinqToDB will auto-detect the compatibility level as 130 (SQL Server 2016)
+ var dataProvider = SqlServerTools.GetDataProvider(
+ SqlServerVersion.AutoDetect,
+ SqlServerProvider.MicrosoftDataSqlClient,
+ _fixture.ConnectionString);
+ await using var db = new DataConnection(dataProvider, _fixture.ConnectionString);
+
+ // Verify LinqToDB detected the version as SQL Server 2016
+ var sqlServerProvider = db.DataProvider as SqlServerDataProvider;
+ sqlServerProvider.Should().NotBeNull();
+ sqlServerProvider!.Version.Should().Be(SqlServerVersion.v2016,
+ "LinqToDB should auto-detect compatibility level 130 as SQL Server 2016");
+
+ // Verify our SupportsStringAggregate property returns false
+ var akkaConnection = new AkkaDataConnection(LinqToDB.ProviderName.SqlServer, db);
+ akkaConnection.SupportsStringAggregate.Should().BeFalse(
+ "SQL Server 2016 does not support STRING_AGG");
+
+ // Act & Assert: StringAggregate should throw when converted to SQL
+ var tagTable = db.GetTable();
+ var journalTable = db.GetTable();
+
+ // This is the exact query pattern used in AddTagDataFromTagTableWithStringAggregateAsync
+ var act = () => journalTable
+ .Select(x => new
+ {
+ row = x,
+ tags = tagTable
+ .Where(r => r.OrderingId == x.Ordering)
+ .StringAggregate(";", r => r.TagValue)
+ .ToValue(),
+ })
+ .ToList();
+
+ act.Should().Throw(
+ "STRING_AGG is not supported on SQL Server 2016 (compatibility level 130)");
+ }
+ }
+}
diff --git a/src/Akka.Persistence.Sql.Tests/Query/SqlServer2016/TagTable/SqlServer2016PersistenceIdsSpec.cs b/src/Akka.Persistence.Sql.Tests/Query/SqlServer2016/TagTable/SqlServer2016PersistenceIdsSpec.cs
new file mode 100644
index 00000000..68d348bf
--- /dev/null
+++ b/src/Akka.Persistence.Sql.Tests/Query/SqlServer2016/TagTable/SqlServer2016PersistenceIdsSpec.cs
@@ -0,0 +1,28 @@
+// -----------------------------------------------------------------------
+//
+// Copyright (C) 2013-2023 .NET Foundation
+//
+// -----------------------------------------------------------------------
+
+using Akka.Persistence.Sql.Config;
+using Akka.Persistence.Sql.Tests.Common.Containers;
+using Akka.Persistence.Sql.Tests.Common.Query;
+using Akka.Persistence.Sql.Tests.SqlServer;
+using Xunit;
+using Xunit.Abstractions;
+#if !DEBUG
+using Akka.Persistence.Sql.Tests.Common.Internal.Xunit;
+#endif
+
+namespace Akka.Persistence.Sql.Tests.Query.SqlServer2016.TagTable
+{
+#if !DEBUG
+ [SkipWindows]
+#endif
+ [Collection(nameof(SqlServer2016PersistenceSpec))]
+ public class SqlServer2016PersistenceIdsSpec : BasePersistenceIdsSpec
+ {
+ public SqlServer2016PersistenceIdsSpec(ITestOutputHelper output, SqlServer2016Container fixture)
+ : base(TagMode.TagTable, output, nameof(SqlServer2016PersistenceIdsSpec), fixture) { }
+ }
+}
diff --git a/src/Akka.Persistence.Sql.Tests/Query/SqlServer2016/TagTable/SqlServer2016QueryThrottleSpecs.cs b/src/Akka.Persistence.Sql.Tests/Query/SqlServer2016/TagTable/SqlServer2016QueryThrottleSpecs.cs
new file mode 100644
index 00000000..0d7fffe0
--- /dev/null
+++ b/src/Akka.Persistence.Sql.Tests/Query/SqlServer2016/TagTable/SqlServer2016QueryThrottleSpecs.cs
@@ -0,0 +1,22 @@
+// -----------------------------------------------------------------------
+//
+// Copyright (C) 2013-2023 .NET Foundation
+//
+// -----------------------------------------------------------------------
+
+using Akka.Persistence.Sql.Config;
+using Akka.Persistence.Sql.Tests.Common.Containers;
+using Akka.Persistence.Sql.Tests.SqlServer;
+using Xunit;
+using Xunit.Abstractions;
+
+namespace Akka.Persistence.Sql.Tests.Query.SqlServer2016.TagTable;
+
+[Collection(nameof(SqlServer2016PersistenceSpec))]
+public class SqlServer2016QueryThrottleSpecs : QueryThrottleSpecsBase
+{
+ public SqlServer2016QueryThrottleSpecs(ITestOutputHelper output, SqlServer2016Container fixture)
+ : base(TagMode.TagTable, output, nameof(SqlServer2016QueryThrottleSpecs), fixture)
+ {
+ }
+}
diff --git a/src/Akka.Persistence.Sql.Tests/Settings/SupportsStringAggregateSpec.cs b/src/Akka.Persistence.Sql.Tests/Settings/SupportsStringAggregateSpec.cs
new file mode 100644
index 00000000..d6220a57
--- /dev/null
+++ b/src/Akka.Persistence.Sql.Tests/Settings/SupportsStringAggregateSpec.cs
@@ -0,0 +1,60 @@
+// -----------------------------------------------------------------------
+//
+// Copyright (C) 2013-2023 .NET Foundation
+//
+// -----------------------------------------------------------------------
+
+using Akka.Persistence.Sql.Db;
+using FluentAssertions;
+using LinqToDB.Data;
+using LinqToDB.DataProvider.SqlServer;
+using Xunit;
+
+namespace Akka.Persistence.Sql.Tests.Settings
+{
+ public class SupportsStringAggregateSpec
+ {
+ [Theory(DisplayName = "SQL Server versions before 2017 should not support StringAggregate")]
+ [InlineData(SqlServerVersion.v2005)]
+ [InlineData(SqlServerVersion.v2008)]
+ [InlineData(SqlServerVersion.v2012)]
+ [InlineData(SqlServerVersion.v2014)]
+ [InlineData(SqlServerVersion.v2016)]
+ public void SqlServerBelow2017ShouldNotSupportStringAggregate(SqlServerVersion version)
+ {
+ var dataProvider = SqlServerTools.GetDataProvider(version);
+ using var dataConnection = new DataConnection(dataProvider, "Server=fake;Database=fake;");
+ var akkaConnection = new AkkaDataConnection(LinqToDB.ProviderName.SqlServer, dataConnection);
+
+ akkaConnection.SupportsStringAggregate.Should().BeFalse(
+ $"SQL Server {version} does not support STRING_AGG");
+ }
+
+ [Theory(DisplayName = "SQL Server 2017 and above should support StringAggregate")]
+ [InlineData(SqlServerVersion.v2017)]
+ [InlineData(SqlServerVersion.v2019)]
+ [InlineData(SqlServerVersion.v2022)]
+ public void SqlServer2017AndAboveShouldSupportStringAggregate(SqlServerVersion version)
+ {
+ var dataProvider = SqlServerTools.GetDataProvider(version);
+ using var dataConnection = new DataConnection(dataProvider, "Server=fake;Database=fake;");
+ var akkaConnection = new AkkaDataConnection(LinqToDB.ProviderName.SqlServer, dataConnection);
+
+ akkaConnection.SupportsStringAggregate.Should().BeTrue(
+ $"SQL Server {version} supports STRING_AGG");
+ }
+
+ [Fact(DisplayName = "Non-SQL Server providers should always support StringAggregate")]
+ public void NonSqlServerProvidersShouldSupportStringAggregate()
+ {
+ // Use a PostgreSQL provider as representative non-SQL Server provider
+ var dataProvider = LinqToDB.DataProvider.PostgreSQL.PostgreSQLTools.GetDataProvider(
+ LinqToDB.DataProvider.PostgreSQL.PostgreSQLVersion.v15);
+ using var dataConnection = new DataConnection(dataProvider, "Host=fake;Database=fake;");
+ var akkaConnection = new AkkaDataConnection(LinqToDB.ProviderName.PostgreSQL15, dataConnection);
+
+ akkaConnection.SupportsStringAggregate.Should().BeTrue(
+ "non-SQL Server providers support STRING_AGG or GROUP_CONCAT equivalents");
+ }
+ }
+}
diff --git a/src/Akka.Persistence.Sql.Tests/SqlServer/SqlServer2016PersistenceSpec.cs b/src/Akka.Persistence.Sql.Tests/SqlServer/SqlServer2016PersistenceSpec.cs
new file mode 100644
index 00000000..363ccb6d
--- /dev/null
+++ b/src/Akka.Persistence.Sql.Tests/SqlServer/SqlServer2016PersistenceSpec.cs
@@ -0,0 +1,14 @@
+// -----------------------------------------------------------------------
+//
+// Copyright (C) 2013-2023 .NET Foundation
+//
+// -----------------------------------------------------------------------
+
+using Akka.Persistence.Sql.Tests.Common.Containers;
+using Xunit;
+
+namespace Akka.Persistence.Sql.Tests.SqlServer
+{
+ [CollectionDefinition(nameof(SqlServer2016PersistenceSpec), DisableParallelization = true)]
+ public sealed class SqlServer2016PersistenceSpec : ICollectionFixture { }
+}
diff --git a/src/Akka.Persistence.Sql/Db/AkkaDataConnection.cs b/src/Akka.Persistence.Sql/Db/AkkaDataConnection.cs
index 8fe38645..7c5fdb83 100644
--- a/src/Akka.Persistence.Sql/Db/AkkaDataConnection.cs
+++ b/src/Akka.Persistence.Sql/Db/AkkaDataConnection.cs
@@ -13,6 +13,7 @@
using LinqToDB;
using LinqToDB.Data;
using LinqToDB.DataProvider;
+using LinqToDB.DataProvider.SqlServer;
using LinqToDB.SchemaProvider;
namespace Akka.Persistence.Sql.Db
@@ -36,6 +37,23 @@ public AkkaDataConnection(
public IDataProvider DataProvider => _connection.DataProvider;
+ ///
+ /// Checks whether the resolved data provider supports SQL-level string aggregation
+ /// (STRING_AGG for SQL Server 2017+, GROUP_CONCAT for MySQL/SQLite).
+ /// Uses LinqToDB's auto-detected when available.
+ ///
+ internal bool SupportsStringAggregate
+ {
+ get
+ {
+ if (_connection.DataProvider is SqlServerDataProvider sqlServerProvider)
+ return sqlServerProvider.Version >= SqlServerVersion.v2017;
+
+ // All non-SQL Server providers (PostgreSQL, MySQL, SQLite) support StringAggregate
+ return true;
+ }
+ }
+
public ValueTask DisposeAsync()
=> _connection.DisposeAsync();
diff --git a/src/Akka.Persistence.Sql/Query/Dao/BaseByteReadArrayJournalDao.cs b/src/Akka.Persistence.Sql/Query/Dao/BaseByteReadArrayJournalDao.cs
index 671eaf85..6e415e86 100644
--- a/src/Akka.Persistence.Sql/Query/Dao/BaseByteReadArrayJournalDao.cs
+++ b/src/Akka.Persistence.Sql/Query/Dao/BaseByteReadArrayJournalDao.cs
@@ -305,6 +305,21 @@ CancellationToken token
}
private static async Task> AddTagDataFromTagTableAsync(IQueryable rowQuery, AkkaDataConnection connection, CancellationToken token)
+ {
+ if (connection.SupportsStringAggregate)
+ return await AddTagDataFromTagTableWithStringAggregateAsync(rowQuery, connection, token);
+
+ return await AddTagDataFromTagTableClientSideAsync(rowQuery, connection, token);
+ }
+
+ ///
+ /// Uses SQL-level STRING_AGG (or GROUP_CONCAT) to aggregate tags in a single query.
+ /// Supported by SQL Server 2017+, PostgreSQL 9.0+, MySQL, and SQLite.
+ ///
+ private static async Task> AddTagDataFromTagTableWithStringAggregateAsync(
+ IQueryable rowQuery,
+ AkkaDataConnection connection,
+ CancellationToken token)
{
var tagTable = connection.GetTable();
@@ -329,5 +344,44 @@ private static async Task> AddTagDataFromTagTableAsync(IQueryab
return result;
}
+
+ ///
+ /// Fallback implementation for providers that do not support STRING_AGG (e.g., SQL Server < 2017).
+ /// Fetches journal rows first, then queries tags separately and joins them client-side.
+ ///
+ private static async Task> AddTagDataFromTagTableClientSideAsync(
+ IQueryable rowQuery,
+ AkkaDataConnection connection,
+ CancellationToken token)
+ {
+ var rows = await rowQuery.ToListAsync(token);
+ if (rows.Count == 0)
+ return rows;
+
+ var orderings = rows.Select(r => r.Ordering).ToList();
+
+ var tagRows = await connection
+ .GetTable()
+ .Where(t => orderings.Contains(t.OrderingId))
+ .ToListAsync(token);
+
+ var tagsByOrdering = new Dictionary>();
+ foreach (var tagRow in tagRows)
+ {
+ if (!tagsByOrdering.TryGetValue(tagRow.OrderingId, out var list))
+ {
+ list = [];
+ tagsByOrdering[tagRow.OrderingId] = list;
+ }
+ list.Add(tagRow.TagValue);
+ }
+
+ foreach (var row in rows)
+ {
+ row.TagArray = tagsByOrdering.TryGetValue(row.Ordering, out var tags) ? tags.ToArray() : [];
+ }
+
+ return rows;
+ }
}
}