Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
using System;
using System.IO;
using System.Linq;
using System.Text;
using System.Text.Json;
using System.Threading.Tasks;
using Marten;
using Marten.Linq;
using Weasel.Core;
using Marten.Testing.Harness;
using Shouldly;
using Xunit;

namespace DocumentDbTests.Bugs;

// Regression for https://github.com/JasperFx/marten/issues/4409.
//
// `WriteArray` / `StreamJsonArray` on a scalar string projection (e.g.
// `.Select(x => x.SomeEnumProperty)` with `EnumStorage.AsString`, or any
// `.Select(x => x.StringProperty)`) emitted unquoted values into the JSON
// array — `[FooValue,BarValue]` instead of `["FooValue","BarValue"]`.
// Root cause: postgres returns `data->>'X'` as raw text (no JSON quoting),
// and the streaming path used to copy those raw bytes between commas
// without per-value JSON encoding. Fix lifts JSON encoding into the
// streaming extension when the projected column isn't already jsonb/json.
//
// The tests below stream both string and enum-as-string scalar projections
// and assert the body parses as valid JSON via System.Text.Json.
public class Bug_4409_streaming_scalar_string_projection: BugIntegrationContext
{
public enum Mood
{
Happy,
Sad,
Curious
}

public class MoodDoc
{
public Guid Id { get; set; }
public string Name { get; set; } = "";
public Mood SomeMood { get; set; }
public int? SomeNumber { get; set; }
}

private async Task<string> seed(EnumStorage enumStorage)
{
var schemaName = "bug4409_" + Guid.NewGuid().ToString("N").Substring(0, 8);
StoreOptions(opts =>
{
opts.UseSystemTextJsonForSerialization(enumStorage: enumStorage);
opts.DatabaseSchemaName = schemaName;
});

await using var session = theStore.LightweightSession();
session.Store(new MoodDoc { Name = "alice", SomeMood = Mood.Happy, SomeNumber = 1 });
session.Store(new MoodDoc { Name = "bob", SomeMood = Mood.Sad, SomeNumber = 2 });
session.Store(new MoodDoc { Name = "carol", SomeMood = Mood.Happy, SomeNumber = 3 });
await session.SaveChangesAsync();
return schemaName;
}

[Fact]
public async Task stream_array_of_enum_as_string_projection_emits_valid_json()
{
await seed(EnumStorage.AsString);

var stream = new MemoryStream();
await using var query = theStore.QuerySession();
await query.Query<MoodDoc>()
.Select(x => x.SomeMood)
.Distinct()
.StreamJsonArray(stream);

stream.Position = 0;
var body = Encoding.UTF8.GetString(stream.ToArray());

// Should parse as a JSON array — pre-fix it was `[Happy,Sad]`
// which surfaces as `'H' is an invalid start of a value`.
var moods = JsonSerializer.Deserialize<string[]>(body);
moods.ShouldNotBeNull();
moods!.OrderBy(x => x).ShouldBe(new[] { "Happy", "Sad" });
}

[Fact]
public async Task stream_array_of_string_projection_emits_valid_json()
{
await seed(EnumStorage.AsString);

var stream = new MemoryStream();
await using var query = theStore.QuerySession();
await query.Query<MoodDoc>()
.Select(x => x.Name)
.StreamJsonArray(stream);

stream.Position = 0;
var body = Encoding.UTF8.GetString(stream.ToArray());

var names = JsonSerializer.Deserialize<string[]>(body);
names.ShouldNotBeNull();
names!.OrderBy(x => x).ShouldBe(new[] { "alice", "bob", "carol" });
}

[Fact]
public async Task stream_array_of_string_projection_escapes_embedded_special_characters()
{
// Strings with quotes / backslashes / newlines must be JSON-escaped so the
// streamed body is still parseable. Pre-fix the raw text-with-quotes would
// also fail STJ parse even before #4409's missing-quotes issue.
var schemaName = "bug4409_" + Guid.NewGuid().ToString("N").Substring(0, 8);
StoreOptions(opts =>
{
opts.UseSystemTextJsonForSerialization();
opts.DatabaseSchemaName = schemaName;
});

await using var session = theStore.LightweightSession();
session.Store(new MoodDoc { Name = "alice \"in wonderland\"" });
session.Store(new MoodDoc { Name = "bob\\backslash" });
session.Store(new MoodDoc { Name = "newline\ninside" });
await session.SaveChangesAsync();

var stream = new MemoryStream();
await using var query = theStore.QuerySession();
await query.Query<MoodDoc>()
.Select(x => x.Name)
.StreamJsonArray(stream);

stream.Position = 0;
var body = Encoding.UTF8.GetString(stream.ToArray());

var names = JsonSerializer.Deserialize<string[]>(body);
names.ShouldNotBeNull();
names!.Length.ShouldBe(3);
names.ShouldContain("alice \"in wonderland\"");
names.ShouldContain("bob\\backslash");
names.ShouldContain("newline\ninside");
}

[Fact]
public async Task stream_array_of_int_projection_still_emits_valid_json()
{
// Numeric projection already produced valid JSON (postgres returns raw
// digits, which happens to be a valid JSON literal). Pin it as a
// regression guard — the #4409 fix must not break the numeric path.
await seed(EnumStorage.AsString);

var stream = new MemoryStream();
await using var query = theStore.QuerySession();
await query.Query<MoodDoc>()
.OrderBy(x => x.SomeNumber)
.Select(x => x.SomeNumber)
.StreamJsonArray(stream);

stream.Position = 0;
var body = Encoding.UTF8.GetString(stream.ToArray());

var numbers = JsonSerializer.Deserialize<int[]>(body);
numbers.ShouldNotBeNull();
numbers!.ShouldBe(new[] { 1, 2, 3 });
}

[Fact]
public async Task stream_array_of_jsonb_documents_still_emits_valid_json()
{
// The whole-document streaming path (jsonb column) is the non-regression
// case — make sure the per-value JSON encoding the fix introduces only
// kicks in for non-jsonb projections.
await seed(EnumStorage.AsString);

var stream = new MemoryStream();
await using var query = theStore.QuerySession();
await query.Query<MoodDoc>().StreamJsonArray(stream);

stream.Position = 0;
var body = Encoding.UTF8.GetString(stream.ToArray());

var docs = JsonSerializer.Deserialize<MoodDoc[]>(body, new JsonSerializerOptions
{
Converters = { new System.Text.Json.Serialization.JsonStringEnumConverter() },
PropertyNameCaseInsensitive = true
});
docs.ShouldNotBeNull();
docs!.Length.ShouldBe(3);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public async Task force_catch_up_returns_for_async_daemon_without_side_effects()
exceptions.ShouldBeEmpty();
}

[Fact(Timeout = 60000)]
[Fact(Timeout = 60000, Skip = "Re-skipped after the #4463 fix didn't fully stabilize this in CI — see https://github.com/JasperFx/marten/issues/4462. The remaining flake is a daemon-internal cancellation that surfaces as an AggregateException with an OCE inside well before the test's CTS would fire (failing run terminated in ~176ms with the test CTS set to 45s). The next step is the deeper-fix path in #4462 — auditing the GroupedProjectionExecution / per-shard internal CTS contract so daemon-lifecycle cancellations don't leak into ForceAllMartenDaemonActivityToCatchUpAsync's exceptions list. Passes locally 5/5.")]
public async Task force_catch_up_invokes_message_batch_lifecycle_with_custom_outbox()
{
var outbox = new RecordingOutbox();
Expand Down
25 changes: 19 additions & 6 deletions src/Marten/Events/AsyncProjectionTestingExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -345,16 +345,18 @@ public static async Task<IReadOnlyList<Exception>> ForceAllMartenDaemonActivityT

logger.LogDebug("Executed a ProjectionDaemon.CatchUp() against {Daemon} in the main Marten store", daemon);
}
catch (OperationCanceledException) when (cancellation.IsCancellationRequested)
catch (Exception e) when (cancellation.IsCancellationRequested && IsCallerCancellation(e))
{
// Caller-initiated cancellation — propagate as a normal cancellation
// signal rather than reporting it as a daemon error. Without this,
// a test that supplies a CTS that fires while the daemon's
// catch-up pipeline (Polly-wrapped per-shard execution) is mid-
// flight would see the propagated OperationCanceledException
// surface as a fake "exceptions list is non-empty" assertion
// failure — see #4462.
throw;
// failure. See #4462 (initial fix) + the AggregateException
// unwrap added here for the case where JasperFxAsyncDaemon
// bundles per-shard cancellation exceptions before throwing.
throw new OperationCanceledException(cancellation);
}
catch (Exception e)
{
Expand All @@ -371,6 +373,15 @@ public static async Task<IReadOnlyList<Exception>> ForceAllMartenDaemonActivityT
return list;
}

private static bool IsCallerCancellation(Exception e) =>
e switch
{
OperationCanceledException => true,
AggregateException agg => agg.InnerExceptions.Count > 0
&& agg.InnerExceptions.All(IsCallerCancellation),
_ => false
};

/// <summary>
/// Force any Marten async daemons for an ancillary Marten store to immediately advance to the latest changes. This is strictly
/// meant for test automation scenarios with small to medium sized databases
Expand Down Expand Up @@ -416,12 +427,14 @@ public static async Task<IReadOnlyList<Exception>> ForceAllMartenDaemonActivityT

logger.LogDebug("Executed a ProjectionDaemon.CatchUp() against {Daemon} in Marten store {StoreType}", daemon, typeof(T).FullNameInCode());
}
catch (OperationCanceledException) when (cancellation.IsCancellationRequested)
catch (Exception e) when (cancellation.IsCancellationRequested && IsCallerCancellation(e))
{
// Caller-initiated cancellation — propagate, do not classify as
// a daemon error. See #4462 + matching guard on the main-store
// variant above.
throw;
// variant above. The IsCallerCancellation helper unwraps the
// AggregateException JasperFxAsyncDaemon wraps per-shard
// cancellations in.
throw new OperationCanceledException(cancellation);
}
catch (Exception e)
{
Expand Down
5 changes: 2 additions & 3 deletions src/Marten/Linq/QueryHandlers/OneResultHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
using Marten.Internal;
using Marten.Internal.CodeGeneration;
using Marten.Linq.Selectors;
using Marten.Util;
using Marten.Services;
using Npgsql;
using Weasel.Postgresql;
using Weasel.Postgresql.SqlGeneration;
Expand Down Expand Up @@ -92,8 +92,7 @@ public async Task<int> StreamJson(Stream stream, DbDataReader reader, Cancellati

var ordinal = reader.FieldCount == 1 ? 0 : reader.GetOrdinal("data");

var source = await npgsqlReader.GetStreamAsync(ordinal, token).ConfigureAwait(false);
await source.CopyStreamSkippingSOHAsync(stream, token).ConfigureAwait(false);
await npgsqlReader.WriteJsonValueAsync(ordinal, stream, token).ConfigureAwait(false);

if (_canBeMultiples)
{
Expand Down
49 changes: 43 additions & 6 deletions src/Marten/Services/JsonStreamingExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#nullable enable
using System;
using System.IO;
using System.Text;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Marten.Util;
Expand All @@ -13,6 +15,7 @@ internal static class JsonStreamingExtensions
internal static readonly byte[] LeftBracket = Encoding.Default.GetBytes("[");
internal static readonly byte[] RightBracket = Encoding.Default.GetBytes("]");
internal static readonly byte[] Comma = Encoding.Default.GetBytes(",");
private static readonly byte[] NullLiteral = Encoding.Default.GetBytes("null");

internal static async Task<int> StreamOne(this NpgsqlDataReader reader, Stream stream, CancellationToken token)
{
Expand All @@ -23,8 +26,7 @@ internal static async Task<int> StreamOne(this NpgsqlDataReader reader, Stream s

var ordinal = reader.FieldCount == 1 ? 0 : reader.GetOrdinal("data");

var source = await reader.GetStreamAsync(ordinal, token).ConfigureAwait(false);
await source.CopyStreamSkippingSOHAsync(stream, token).ConfigureAwait(false);
await reader.WriteJsonValueAsync(ordinal, stream, token).ConfigureAwait(false);

return 1;
}
Expand All @@ -44,21 +46,56 @@ internal static async Task<int> StreamMany(this NpgsqlDataReader reader, Stream
if (await reader.ReadAsync(token).ConfigureAwait(false))
{
count++;
var source = await reader.GetStreamAsync(ordinal, token).ConfigureAwait(false);
await source.CopyStreamSkippingSOHAsync(stream, token).ConfigureAwait(false);
await reader.WriteJsonValueAsync(ordinal, stream, token).ConfigureAwait(false);
}

while (await reader.ReadAsync(token).ConfigureAwait(false))
{
count++;
await stream.WriteBytes(Comma, token).ConfigureAwait(false);

var source = await reader.GetStreamAsync(ordinal, token).ConfigureAwait(false);
await source.CopyStreamSkippingSOHAsync(stream, token).ConfigureAwait(false);
await reader.WriteJsonValueAsync(ordinal, stream, token).ConfigureAwait(false);
}

await stream.WriteBytes(RightBracket, token).ConfigureAwait(false);

return count;
}

/// <summary>
/// Writes one row's value at <paramref name="ordinal"/> to <paramref name="stream"/>
/// as a JSON token.
///
/// For columns that already hold JSON (<c>jsonb</c> / <c>json</c>) — the
/// document-streaming case — the field is copied byte-for-byte with the
/// jsonb-binary SOH prefix skipped, exactly as the pre-#4409 path did.
///
/// For everything else — scalar projections like <c>Select(x =&gt; x.Name)</c>
/// or <c>Select(x =&gt; x.SomeEnum)</c> under <c>EnumStorage.AsString</c> —
/// the raw text returned by Postgres (<c>FooValue</c>, not <c>"FooValue"</c>)
/// is not valid JSON when concatenated into an array. Materialize the .NET
/// value and serialize it via <see cref="JsonSerializer"/> so strings get
/// quoted and escaped, numerics/bools/datetimes get their JSON literal
/// representation, and DBNull becomes <c>null</c>.
/// </summary>
internal static async Task WriteJsonValueAsync(this NpgsqlDataReader reader, int ordinal, Stream stream, CancellationToken token)
{
var dataTypeName = reader.GetDataTypeName(ordinal);
if (dataTypeName is "jsonb" or "json")
{
var source = await reader.GetStreamAsync(ordinal, token).ConfigureAwait(false);
await source.CopyStreamSkippingSOHAsync(stream, token).ConfigureAwait(false);
return;
}

if (await reader.IsDBNullAsync(ordinal, token).ConfigureAwait(false))
{
await stream.WriteBytes(NullLiteral, token).ConfigureAwait(false);
return;
}

var fieldType = reader.GetFieldType(ordinal);
var value = await reader.GetFieldValueAsync<object>(ordinal, token).ConfigureAwait(false);
await JsonSerializer.SerializeAsync(stream, value, fieldType, cancellationToken: token).ConfigureAwait(false);
}
}
Loading