diff --git a/docs/documents/aspnetcore.md b/docs/documents/aspnetcore.md
index ac6f899098..4e118a8393 100644
--- a/docs/documents/aspnetcore.md
+++ b/docs/documents/aspnetcore.md
@@ -163,3 +163,71 @@ public Task Get3(Guid issueId, [FromServices] IQuerySession session, [FromQuery]
```
snippet source | anchor
+
+## Writing Event Sourcing Aggregates
+
+If you are using Marten's [event sourcing](/events/) and [single stream projections](/events/projections/single-stream-projections),
+the `WriteLatest()` extension method on `IEventStoreOperations` lets you stream the
+projected aggregate's JSON directly to an HTTP response. This is the event sourcing equivalent
+of `WriteById()` for documents.
+
+The key advantage is performance: for `Inline` projections, the aggregate already exists as raw JSONB
+in PostgreSQL and is streamed directly to the HTTP response with **zero deserialization or serialization**.
+For `Async` projections that are caught up, the same optimization applies. Only when the async daemon
+is behind does Marten fall back to rebuilding the aggregate in memory.
+
+This is built on top of `FetchLatest()` — see [Reading Aggregates](/events/projections/read-aggregates#fetchlatest)
+for details on how each projection lifecycle is handled.
+
+Usage with a `Guid`-identified stream:
+
+
+
+```cs
+[HttpGet("/order/{orderId:guid}")]
+public Task GetOrder(Guid orderId, [FromServices] IDocumentSession session)
+{
+ // Streams the raw JSON of the projected aggregate to the HTTP response
+ // without deserialization/serialization when the projection is stored inline
+ return session.Events.WriteLatest(orderId, HttpContext);
+}
+```
+snippet source | anchor
+
+
+Usage with a `string`-identified stream:
+
+
+
+```cs
+[HttpGet("/named-order/{orderId}")]
+public Task GetNamedOrder(string orderId, [FromServices] IDocumentSession session)
+{
+ return session.Events.WriteLatest(orderId, HttpContext);
+}
+```
+snippet source | anchor
+
+
+Like `WriteById()`, `WriteLatest()` returns a 200 status with the JSON body if the aggregate
+is found, or a 404 with no body if not found. You can customize the content type and success status code:
+
+```csharp
+// Use a custom status code and content type
+await session.Events.WriteLatest(orderId, HttpContext,
+ contentType: "application/json; charset=utf-8",
+ onFoundStatus: 201);
+```
+
+::: warning
+`WriteLatest()` requires `IDocumentSession` (not `IQuerySession`) because
+`FetchLatest()` is only available on `IDocumentSession`.
+:::
+
+There is also a lower-level `StreamLatestJson()` method on `IEventStoreOperations` that
+writes the raw JSON to any `Stream`, which you can use to build your own response handling:
+
+```csharp
+var stream = new MemoryStream();
+bool found = await session.Events.StreamLatestJson(orderId, stream);
+```
diff --git a/docs/events/projections/read-aggregates.md b/docs/events/projections/read-aggregates.md
index d463f7d673..6cdafc7547 100644
--- a/docs/events/projections/read-aggregates.md
+++ b/docs/events/projections/read-aggregates.md
@@ -46,6 +46,9 @@ new events, we *strongly* recommend using the [`FetchForWriting`](/scenarios/com
::: tip
`FetchLatest` is a little more lightweight in execution than `FetchForWriting` and
should be used if all you care about is read only data without appending new events.
+
+If you are serving HTTP APIs, see [Writing Event Sourcing Aggregates](/documents/aspnetcore#writing-event-sourcing-aggregates)
+for how to stream the aggregate's JSON directly to the HTTP response with zero deserialization using `WriteLatest()`.
:::
::: warning
diff --git a/src/DaemonTests/Resiliency/when_skipping_events_in_daemon.cs b/src/DaemonTests/Resiliency/when_skipping_events_in_daemon.cs
index 2e3a4abb51..0c8704d657 100644
--- a/src/DaemonTests/Resiliency/when_skipping_events_in_daemon.cs
+++ b/src/DaemonTests/Resiliency/when_skipping_events_in_daemon.cs
@@ -132,7 +132,11 @@ public async Task see_the_dead_letter_events()
// Drain the dead letter events queued up
await daemon.StopAllAsync();
+ #region sample_replacing_logger_per_session
+ // We frequently use this special marten logger per
+ // session to pipe Marten logging to the xUnit.Net output
theSession.Logger = new TestOutputMartenLogger(_output);
+ #endregion
var skipped = await theSession.Query().ToListAsync();
skipped.Where(x => x.ProjectionName == "NamedDocuments" && x.ShardName == "All")
diff --git a/src/IssueService/Controllers/WriteLatestController.cs b/src/IssueService/Controllers/WriteLatestController.cs
new file mode 100644
index 0000000000..ed7fc8d986
--- /dev/null
+++ b/src/IssueService/Controllers/WriteLatestController.cs
@@ -0,0 +1,71 @@
+using System;
+using System.Threading.Tasks;
+using Marten;
+using Marten.AspNetCore;
+using Marten.Events;
+using Microsoft.AspNetCore.Mvc;
+
+namespace IssueService.Controllers;
+
+// Simple events for testing
+public record OrderPlaced(string Description, decimal Amount);
+public record OrderShipped;
+
+// Guid-identified aggregate for inline projection
+public class Order
+{
+ public Guid Id { get; set; }
+ public string Description { get; set; }
+ public decimal Amount { get; set; }
+ public bool Shipped { get; set; }
+
+ public void Apply(OrderPlaced placed)
+ {
+ Description = placed.Description;
+ Amount = placed.Amount;
+ }
+
+ public void Apply(OrderShipped _) => Shipped = true;
+}
+
+// String-identified aggregate for inline projection
+public class NamedOrder
+{
+ public string Id { get; set; }
+ public string Description { get; set; }
+ public decimal Amount { get; set; }
+ public bool Shipped { get; set; }
+
+ public void Apply(OrderPlaced placed)
+ {
+ Description = placed.Description;
+ Amount = placed.Amount;
+ }
+
+ public void Apply(OrderShipped _) => Shipped = true;
+}
+
+public class WriteLatestController: ControllerBase
+{
+ #region sample_write_latest_aggregate_to_httpresponse
+
+ [HttpGet("/order/{orderId:guid}")]
+ public Task GetOrder(Guid orderId, [FromServices] IDocumentSession session)
+ {
+ // Streams the raw JSON of the projected aggregate to the HTTP response
+ // without deserialization/serialization when the projection is stored inline
+ return session.Events.WriteLatest(orderId, HttpContext);
+ }
+
+ #endregion
+
+ #region sample_write_latest_aggregate_by_string_to_httpresponse
+
+ [HttpGet("/named-order/{orderId}")]
+ public Task GetNamedOrder(string orderId, [FromServices] IDocumentSession session)
+ {
+ return session.Events.WriteLatest(orderId, HttpContext);
+ }
+
+ #endregion
+}
diff --git a/src/IssueService/MartenSettings.cs b/src/IssueService/MartenSettings.cs
index 44f08ea1e9..9a2281525c 100644
--- a/src/IssueService/MartenSettings.cs
+++ b/src/IssueService/MartenSettings.cs
@@ -5,5 +5,6 @@ public class MartenSettings
{
public const string SECTION = "Marten";
public string SchemaName { get; set; }
+ public bool UseStringStreamIdentity { get; set; }
}
#endregion
diff --git a/src/IssueService/Startup.cs b/src/IssueService/Startup.cs
index e1a6dcc820..053574aa47 100644
--- a/src/IssueService/Startup.cs
+++ b/src/IssueService/Startup.cs
@@ -2,7 +2,10 @@
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
+using IssueService.Controllers;
+using JasperFx.Events;
using Marten;
+using Marten.Events.Projections;
using Marten.Testing.Harness;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
@@ -42,6 +45,16 @@ public void ConfigureServices(IServiceCollection services)
options.DatabaseSchemaName = martenSettings.SchemaName;
}
+ if (martenSettings.UseStringStreamIdentity)
+ {
+ options.Events.StreamIdentity = StreamIdentity.AsString;
+ options.Projections.Snapshot(SnapshotLifecycle.Inline);
+ }
+ else
+ {
+ options.Projections.Snapshot(SnapshotLifecycle.Inline);
+ }
+
return options;
}).UseLightweightSessions();
#endregion
diff --git a/src/Marten.AspNetCore.Testing/StringStreamAppFixture.cs b/src/Marten.AspNetCore.Testing/StringStreamAppFixture.cs
new file mode 100644
index 0000000000..523bb4dc09
--- /dev/null
+++ b/src/Marten.AspNetCore.Testing/StringStreamAppFixture.cs
@@ -0,0 +1,41 @@
+using System;
+using System.Threading.Tasks;
+using Alba;
+using IssueService;
+using Microsoft.Extensions.DependencyInjection;
+using Xunit;
+
+namespace Marten.AspNetCore.Testing;
+
+public class StringStreamAppFixture: IAsyncLifetime
+{
+ private string SchemaName { get; } = "sch" + Guid.NewGuid().ToString().Replace("-", string.Empty);
+ public IAlbaHost Host { get; private set; }
+
+ public async Task InitializeAsync()
+ {
+ Host = await AlbaHost.For(b =>
+ {
+ b.ConfigureServices((context, services) =>
+ {
+ services.MartenDaemonModeIsSolo();
+
+ services.Configure(s =>
+ {
+ s.SchemaName = SchemaName;
+ s.UseStringStreamIdentity = true;
+ });
+ });
+ });
+ }
+
+ public async Task DisposeAsync()
+ {
+ await Host.DisposeAsync();
+ }
+}
+
+[CollectionDefinition("string_stream_integration")]
+public class StringStreamIntegrationCollection: ICollectionFixture
+{
+}
diff --git a/src/Marten.AspNetCore.Testing/write_latest_streaming_tests.cs b/src/Marten.AspNetCore.Testing/write_latest_streaming_tests.cs
new file mode 100644
index 0000000000..79a11c00f7
--- /dev/null
+++ b/src/Marten.AspNetCore.Testing/write_latest_streaming_tests.cs
@@ -0,0 +1,169 @@
+using System;
+using System.Threading.Tasks;
+using Alba;
+using IssueService.Controllers;
+using Microsoft.Extensions.DependencyInjection;
+using Shouldly;
+using Xunit;
+
+namespace Marten.AspNetCore.Testing;
+
+[Collection("integration")]
+public class write_latest_guid_streaming_tests: IntegrationContext
+{
+ private readonly IAlbaHost theHost;
+
+ public write_latest_guid_streaming_tests(AppFixture fixture): base(fixture)
+ {
+ theHost = fixture.Host;
+ }
+
+ [Fact]
+ public async Task stream_latest_aggregate_by_guid_hit()
+ {
+ var orderId = Guid.NewGuid();
+
+ var store = theHost.Services.GetRequiredService();
+ await using (var session = store.LightweightSession())
+ {
+ session.Events.StartStream(orderId, new OrderPlaced("Widget", 99.95m));
+ await session.SaveChangesAsync();
+ }
+
+ var result = await theHost.Scenario(s =>
+ {
+ s.Get.Url($"/order/{orderId}");
+ s.StatusCodeShouldBe(200);
+ s.ContentTypeShouldBe("application/json");
+ });
+
+ var read = result.ReadAsJson();
+ read.ShouldNotBeNull();
+ read.Description.ShouldBe("Widget");
+ read.Amount.ShouldBe(99.95m);
+ read.Shipped.ShouldBeFalse();
+ }
+
+ [Fact]
+ public async Task stream_latest_aggregate_by_guid_miss()
+ {
+ await theHost.Scenario(s =>
+ {
+ s.Get.Url($"/order/{Guid.NewGuid()}");
+ s.StatusCodeShouldBe(404);
+ });
+ }
+
+ [Fact]
+ public async Task stream_latest_aggregate_by_guid_with_multiple_events()
+ {
+ var orderId = Guid.NewGuid();
+
+ var store = theHost.Services.GetRequiredService();
+ await using (var session = store.LightweightSession())
+ {
+ session.Events.StartStream(orderId,
+ new OrderPlaced("Gadget", 149.99m),
+ new OrderShipped());
+ await session.SaveChangesAsync();
+ }
+
+ var result = await theHost.Scenario(s =>
+ {
+ s.Get.Url($"/order/{orderId}");
+ s.StatusCodeShouldBe(200);
+ s.ContentTypeShouldBe("application/json");
+ });
+
+ var read = result.ReadAsJson();
+ read.ShouldNotBeNull();
+ read.Description.ShouldBe("Gadget");
+ read.Amount.ShouldBe(149.99m);
+ read.Shipped.ShouldBeTrue();
+ }
+}
+
+[Collection("string_stream_integration")]
+public class write_latest_string_streaming_tests: IAsyncLifetime
+{
+ private readonly IAlbaHost theHost;
+ private readonly IDocumentStore Store;
+
+ public write_latest_string_streaming_tests(StringStreamAppFixture fixture)
+ {
+ theHost = fixture.Host;
+ Store = theHost.Services.GetRequiredService();
+ }
+
+ public async Task InitializeAsync()
+ {
+ await Store.Advanced.ResetAllData();
+ }
+
+ public Task DisposeAsync()
+ {
+ return Task.CompletedTask;
+ }
+
+ [Fact]
+ public async Task stream_latest_aggregate_by_string_hit()
+ {
+ var orderId = "order-" + Guid.NewGuid().ToString("N");
+
+ await using (var session = Store.LightweightSession())
+ {
+ session.Events.StartStream(orderId, new OrderPlaced("Widget", 99.95m));
+ await session.SaveChangesAsync();
+ }
+
+ var result = await theHost.Scenario(s =>
+ {
+ s.Get.Url($"/named-order/{orderId}");
+ s.StatusCodeShouldBe(200);
+ s.ContentTypeShouldBe("application/json");
+ });
+
+ var read = result.ReadAsJson();
+ read.ShouldNotBeNull();
+ read.Description.ShouldBe("Widget");
+ read.Amount.ShouldBe(99.95m);
+ read.Shipped.ShouldBeFalse();
+ }
+
+ [Fact]
+ public async Task stream_latest_aggregate_by_string_miss()
+ {
+ await theHost.Scenario(s =>
+ {
+ s.Get.Url($"/named-order/nonexistent-{Guid.NewGuid():N}");
+ s.StatusCodeShouldBe(404);
+ });
+ }
+
+ [Fact]
+ public async Task stream_latest_aggregate_by_string_with_multiple_events()
+ {
+ var orderId = "order-" + Guid.NewGuid().ToString("N");
+
+ await using (var session = Store.LightweightSession())
+ {
+ session.Events.StartStream(orderId,
+ new OrderPlaced("Gadget", 149.99m),
+ new OrderShipped());
+ await session.SaveChangesAsync();
+ }
+
+ var result = await theHost.Scenario(s =>
+ {
+ s.Get.Url($"/named-order/{orderId}");
+ s.StatusCodeShouldBe(200);
+ s.ContentTypeShouldBe("application/json");
+ });
+
+ var read = result.ReadAsJson();
+ read.ShouldNotBeNull();
+ read.Description.ShouldBe("Gadget");
+ read.Amount.ShouldBe(149.99m);
+ read.Shipped.ShouldBeTrue();
+ }
+}
diff --git a/src/Marten.AspNetCore/QueryableExtensions.cs b/src/Marten.AspNetCore/QueryableExtensions.cs
index de482e29ad..3560372f37 100644
--- a/src/Marten.AspNetCore/QueryableExtensions.cs
+++ b/src/Marten.AspNetCore/QueryableExtensions.cs
@@ -2,6 +2,7 @@
using System.IO;
using System.Linq;
using System.Threading.Tasks;
+using Marten.Events;
using Marten.Linq;
using Microsoft.AspNetCore.Http;
@@ -325,4 +326,76 @@ params object[] parameters
await session.StreamJson(context.Response.Body, context.RequestAborted, sql, parameters).ConfigureAwait(false);
}
+ ///
+ /// Write the raw JSON of the latest projected aggregate T by Guid id directly to the HttpContext response,
+ /// avoiding deserialization/serialization round-trips when possible. Returns 404 if not found.
+ ///
+ ///
+ ///
+ ///
+ ///
+ /// Defaults to 200
+ ///
+ public static async Task WriteLatest(
+ this IEventStoreOperations events,
+ Guid id,
+ HttpContext context,
+ string contentType = "application/json",
+ int onFoundStatus = 200
+ ) where T : class
+ {
+ var stream = Marten.Internal.SharedMemoryStreamManager.GetStream();
+ var found = await events.StreamLatestJson(id, stream, context.RequestAborted).ConfigureAwait(false);
+ if (found)
+ {
+ context.Response.StatusCode = onFoundStatus;
+ context.Response.ContentLength = stream.Length;
+ context.Response.ContentType = contentType;
+
+ stream.Position = 0;
+ await stream.CopyToAsync(context.Response.Body, context.RequestAborted).ConfigureAwait(false);
+ }
+ else
+ {
+ context.Response.StatusCode = 404;
+ context.Response.ContentLength = 0;
+ }
+ }
+
+ ///
+ /// Write the raw JSON of the latest projected aggregate T by string id directly to the HttpContext response,
+ /// avoiding deserialization/serialization round-trips when possible. Returns 404 if not found.
+ ///
+ ///
+ ///
+ ///
+ ///
+ /// Defaults to 200
+ ///
+ public static async Task WriteLatest(
+ this IEventStoreOperations events,
+ string id,
+ HttpContext context,
+ string contentType = "application/json",
+ int onFoundStatus = 200
+ ) where T : class
+ {
+ var stream = Marten.Internal.SharedMemoryStreamManager.GetStream();
+ var found = await events.StreamLatestJson(id, stream, context.RequestAborted).ConfigureAwait(false);
+ if (found)
+ {
+ context.Response.StatusCode = onFoundStatus;
+ context.Response.ContentLength = stream.Length;
+ context.Response.ContentType = contentType;
+
+ stream.Position = 0;
+ await stream.CopyToAsync(context.Response.Body, context.RequestAborted).ConfigureAwait(false);
+ }
+ else
+ {
+ context.Response.StatusCode = 404;
+ context.Response.ContentLength = 0;
+ }
+ }
+
}
diff --git a/src/Marten/Events/EventStore.FetchForWriting.cs b/src/Marten/Events/EventStore.FetchForWriting.cs
index 3f52dd04a1..81b8e9351c 100644
--- a/src/Marten/Events/EventStore.FetchForWriting.cs
+++ b/src/Marten/Events/EventStore.FetchForWriting.cs
@@ -1,6 +1,7 @@
#nullable enable
using System;
using System.Collections.Generic;
+using System.IO;
using System.Threading;
using System.Threading.Tasks;
using ImTools;
@@ -180,6 +181,18 @@ public Task> FetchForExclusiveWriting(string key,
return plan.FetchForReading(_session, id, cancellation);
}
+ public Task StreamLatestJson(Guid id, Stream destination, CancellationToken cancellation = default) where T : class
+ {
+ var plan = FindFetchPlan();
+ return plan.StreamForReading(_session, id, destination, cancellation);
+ }
+
+ public Task StreamLatestJson(string id, Stream destination, CancellationToken cancellation = default) where T : class
+ {
+ var plan = FindFetchPlan();
+ return plan.StreamForReading(_session, id, destination, cancellation);
+ }
+
internal IAggregateFetchPlan FindFetchPlan() where TDoc : class where TId : notnull
{
if (typeof(TId) == typeof(Guid))
@@ -230,6 +243,8 @@ Task> FetchForWriting(DocumentSessionBase session, TId id, lo
ValueTask FetchForReading(DocumentSessionBase session, TId id, CancellationToken cancellation);
+ Task StreamForReading(DocumentSessionBase session, TId id, Stream destination, CancellationToken cancellation);
+
// These two methods are for batching
IQueryHandler> BuildQueryHandler(QuerySession session, TId id,
long expectedStartingVersion);
diff --git a/src/Marten/Events/Fetching/FetchAsyncPlan.ForReading.cs b/src/Marten/Events/Fetching/FetchAsyncPlan.ForReading.cs
index 504a10befe..58a212d9f0 100644
--- a/src/Marten/Events/Fetching/FetchAsyncPlan.ForReading.cs
+++ b/src/Marten/Events/Fetching/FetchAsyncPlan.ForReading.cs
@@ -2,6 +2,7 @@
using System.Data.Common;
using System.IO;
using System.Linq;
+using System.Text;
using System.Threading;
using System.Threading.Tasks;
using JasperFx.Core;
@@ -9,6 +10,8 @@
using Marten.Internal;
using Marten.Internal.Sessions;
using Marten.Linq.QueryHandlers;
+using Marten.Util;
+using Npgsql;
using Weasel.Postgresql;
namespace Marten.Events.Fetching;
@@ -56,6 +59,77 @@ internal partial class FetchAsyncPlan
return await readLatest(session, id, cancellation, loadHandler, reader, selector).ConfigureAwait(false);
}
+ public async Task StreamForReading(DocumentSessionBase session, TId id, Stream destination, CancellationToken cancellation)
+ {
+ await _identityStrategy.EnsureEventStorageExists(session, cancellation).ConfigureAwait(false);
+ await session.Database.EnsureStorageExistsAsync(typeof(TDoc), cancellation).ConfigureAwait(false);
+
+ var selector = await _identityStrategy.EnsureEventStorageExists(session, cancellation)
+ .ConfigureAwait(false);
+
+ _initialSql ??=
+ $"select {selector.SelectFields().Select(x => "d." + x).Join(", ")} from {_events.DatabaseSchemaName}.mt_events as d";
+
+ var builder = new BatchBuilder { TenantId = session.TenantId };
+
+ var loadHandler = new LoadByIdHandler(_storage, id);
+ loadHandler.ConfigureCommand(builder, session);
+
+ builder.StartNewCommand();
+
+ writeEventFetchStatement(id, builder);
+
+ var batch = builder.Compile();
+ await using var reader = await session.ExecuteReaderAsync(batch, cancellation).ConfigureAwait(false);
+
+ // First result set: buffer raw JSONB (don't deserialize yet)
+ bool hasDocument = await reader.ReadAsync(cancellation).ConfigureAwait(false);
+ MemoryStream? rawJsonBuffer = null;
+
+ if (hasDocument)
+ {
+ var ordinal = reader.GetOrdinal("data");
+ if (!await reader.IsDBNullAsync(ordinal, cancellation).ConfigureAwait(false))
+ {
+ rawJsonBuffer = SharedMemoryStreamManager.GetStream();
+ var source = await ((NpgsqlDataReader)reader).GetStreamAsync(ordinal, cancellation).ConfigureAwait(false);
+ await source.CopyStreamSkippingSOHAsync(rawJsonBuffer, cancellation).ConfigureAwait(false);
+ }
+ }
+
+ // Second result set: check for newer events
+ await reader.NextResultAsync(cancellation).ConfigureAwait(false);
+ var events = await new ListQueryHandler(null, selector)
+ .HandleAsync(reader, session, cancellation).ConfigureAwait(false);
+
+ if (!events.Any() && rawJsonBuffer != null)
+ {
+ // Caught up — stream raw JSONB directly (zero deserialization)
+ rawJsonBuffer.Position = 0;
+ await rawJsonBuffer.CopyToAsync(destination, cancellation).ConfigureAwait(false);
+ return true;
+ }
+
+ if (rawJsonBuffer == null && !events.Any())
+ return false;
+
+ // Not caught up — deserialize stored doc, rebuild with new events, serialize
+ TDoc? document = null;
+ if (rawJsonBuffer != null)
+ {
+ rawJsonBuffer.Position = 0;
+ document = session.Serializer.FromJson(rawJsonBuffer);
+ }
+
+ document = await _aggregator.BuildAsync(events, session, document, id, _storage, cancellation).ConfigureAwait(false);
+ if (document == null) return false;
+
+ _storage.SetIdentity(document, id);
+ var json = session.Serializer.ToJson(document);
+ await destination.WriteAsync(Encoding.UTF8.GetBytes(json), cancellation).ConfigureAwait(false);
+ return true;
+ }
+
private async Task readLatest(DocumentSessionBase session, TId id, CancellationToken cancellation,
LoadByIdHandler loadHandler, DbDataReader reader, IEventStorage selector)
{
diff --git a/src/Marten/Events/Fetching/FetchInlinedPlan.ForReading.cs b/src/Marten/Events/Fetching/FetchInlinedPlan.ForReading.cs
index 77c572b302..52ab9d553c 100644
--- a/src/Marten/Events/Fetching/FetchInlinedPlan.ForReading.cs
+++ b/src/Marten/Events/Fetching/FetchInlinedPlan.ForReading.cs
@@ -1,3 +1,4 @@
+using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Marten.Internal.Sessions;
@@ -34,6 +35,14 @@ internal partial class FetchInlinedPlan
return document;
}
+ public async Task StreamForReading(DocumentSessionBase session, TId id, Stream destination, CancellationToken cancellation)
+ {
+ var storage = findDocumentStorage(session);
+ await session.Database.EnsureStorageExistsAsync(typeof(TDoc), cancellation).ConfigureAwait(false);
+ var command = ((IDocumentStorage)storage).BuildLoadCommand(id, session.TenantId);
+ return await session.StreamOne(command, destination, cancellation).ConfigureAwait(false);
+ }
+
public IQueryHandler BuildQueryHandler(QuerySession session, TId id)
{
var storage = findDocumentStorage(session);
diff --git a/src/Marten/Events/Fetching/FetchLivePlan.ForReading.cs b/src/Marten/Events/Fetching/FetchLivePlan.ForReading.cs
index 6f4c35f2ca..0c8faac68a 100644
--- a/src/Marten/Events/Fetching/FetchLivePlan.ForReading.cs
+++ b/src/Marten/Events/Fetching/FetchLivePlan.ForReading.cs
@@ -2,6 +2,7 @@
using System.Collections.Generic;
using System.Data.Common;
using System.IO;
+using System.Text;
using System.Threading;
using System.Threading.Tasks;
using JasperFx.Events;
@@ -44,6 +45,16 @@ internal partial class FetchLivePlan
return await _aggregator.BuildAsync(events, session, default, id, _documentStorage, cancellation).ConfigureAwait(false);
}
+ public async Task StreamForReading(DocumentSessionBase session, TId id, Stream destination, CancellationToken cancellation)
+ {
+ var aggregate = await FetchForReading(session, id, cancellation).ConfigureAwait(false);
+ if (aggregate == null) return false;
+
+ var json = session.Serializer.ToJson(aggregate);
+ await destination.WriteAsync(Encoding.UTF8.GetBytes(json), cancellation).ConfigureAwait(false);
+ return true;
+ }
+
public IQueryHandler BuildQueryHandler(QuerySession session, TId id)
{
return new ReadOnlyQueryHandler(this, id);
diff --git a/src/Marten/Events/IEventStoreOperations.cs b/src/Marten/Events/IEventStoreOperations.cs
index 85929c3d41..cf52b60cdc 100644
--- a/src/Marten/Events/IEventStoreOperations.cs
+++ b/src/Marten/Events/IEventStoreOperations.cs
@@ -2,6 +2,7 @@
using System;
using System.Collections.Generic;
+using System.IO;
using System.Threading;
using System.Threading.Tasks;
using JasperFx.Events;
@@ -378,6 +379,30 @@ Task WriteExclusivelyToAggregate(string id, Func, Task> writi
///
ValueTask FetchLatest(string id, CancellationToken cancellation = default) where T : class;
+ ///
+ /// Stream the raw JSON of the projected aggregate T by id directly to a destination stream.
+ /// This avoids any deserialization/serialization round-trip when the aggregate is stored inline or
+ /// the async projection is caught up. Returns true if found, false if not found.
+ ///
+ ///
+ ///
+ ///
+ ///
+ /// True if the aggregate was found and written to the destination stream
+ Task StreamLatestJson(Guid id, Stream destination, CancellationToken cancellation = default) where T : class;
+
+ ///
+ /// Stream the raw JSON of the projected aggregate T by id directly to a destination stream.
+ /// This avoids any deserialization/serialization round-trip when the aggregate is stored inline or
+ /// the async projection is caught up. Returns true if found, false if not found.
+ ///
+ ///
+ ///
+ ///
+ ///
+ /// True if the aggregate was found and written to the destination stream
+ Task StreamLatestJson(string id, Stream destination, CancellationToken cancellation = default) where T : class;
+
///
/// Completely replace event data at a specified spot in the event store without changing
/// any stream identity or version information. Replaces all header information with empty.