Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions docs/documents/aspnetcore.md
Original file line number Diff line number Diff line change
Expand Up @@ -163,3 +163,71 @@ public Task Get3(Guid issueId, [FromServices] IQuerySession session, [FromQuery]
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/IssueService/Controllers/IssueController.cs#L69-L81' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_write_single_document_to_httpcontext_with_compiled_query' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

## Writing Event Sourcing Aggregates

If you are using Marten's [event sourcing](/events/) and [single stream projections](/events/projections/single-stream-projections),
the `WriteLatest<T>()` extension method on `IEventStoreOperations` lets you stream the
projected aggregate's JSON directly to an HTTP response. This is the event sourcing equivalent
of `WriteById<T>()` for documents.

The key advantage is performance: for `Inline` projections, the aggregate already exists as raw JSONB
in PostgreSQL and is streamed directly to the HTTP response with **zero deserialization or serialization**.
For `Async` projections that are caught up, the same optimization applies. Only when the async daemon
is behind does Marten fall back to rebuilding the aggregate in memory.

This is built on top of `FetchLatest<T>()` — see [Reading Aggregates](/events/projections/read-aggregates#fetchlatest)
for details on how each projection lifecycle is handled.

Usage with a `Guid`-identified stream:

<!-- snippet: sample_write_latest_aggregate_to_httpresponse -->
<a id='snippet-sample_write_latest_aggregate_to_httpresponse'></a>
```cs
[HttpGet("/order/{orderId:guid}")]
public Task GetOrder(Guid orderId, [FromServices] IDocumentSession session)
{
// Streams the raw JSON of the projected aggregate to the HTTP response
// without deserialization/serialization when the projection is stored inline
return session.Events.WriteLatest<Order>(orderId, HttpContext);
}
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/IssueService/Controllers/WriteLatestController.cs#L50-L59' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_write_latest_aggregate_to_httpresponse' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

Usage with a `string`-identified stream:

<!-- snippet: sample_write_latest_aggregate_by_string_to_httpresponse -->
<a id='snippet-sample_write_latest_aggregate_by_string_to_httpresponse'></a>
```cs
[HttpGet("/named-order/{orderId}")]
public Task GetNamedOrder(string orderId, [FromServices] IDocumentSession session)
{
return session.Events.WriteLatest<NamedOrder>(orderId, HttpContext);
}
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/IssueService/Controllers/WriteLatestController.cs#L61-L68' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_write_latest_aggregate_by_string_to_httpresponse' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

Like `WriteById<T>()`, `WriteLatest<T>()` returns a 200 status with the JSON body if the aggregate
is found, or a 404 with no body if not found. You can customize the content type and success status code:

```csharp
// Use a custom status code and content type
await session.Events.WriteLatest<Order>(orderId, HttpContext,
contentType: "application/json; charset=utf-8",
onFoundStatus: 201);
```

::: warning
`WriteLatest<T>()` requires `IDocumentSession` (not `IQuerySession`) because
`FetchLatest<T>()` is only available on `IDocumentSession`.
:::

There is also a lower-level `StreamLatestJson<T>()` method on `IEventStoreOperations` that
writes the raw JSON to any `Stream`, which you can use to build your own response handling:

```csharp
var stream = new MemoryStream();
bool found = await session.Events.StreamLatestJson<Order>(orderId, stream);
```
3 changes: 3 additions & 0 deletions docs/events/projections/read-aggregates.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ new events, we *strongly* recommend using the [`FetchForWriting`](/scenarios/com
::: tip
`FetchLatest` is a little more lightweight in execution than `FetchForWriting` and
should be used if all you care about is read only data without appending new events.

If you are serving HTTP APIs, see [Writing Event Sourcing Aggregates](/documents/aspnetcore#writing-event-sourcing-aggregates)
for how to stream the aggregate's JSON directly to the HTTP response with zero deserialization using `WriteLatest<T>()`.
:::

::: warning
Expand Down
4 changes: 4 additions & 0 deletions src/DaemonTests/Resiliency/when_skipping_events_in_daemon.cs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,11 @@ public async Task see_the_dead_letter_events()
// Drain the dead letter events queued up
await daemon.StopAllAsync();

#region sample_replacing_logger_per_session
// We frequently use this special marten logger per
// session to pipe Marten logging to the xUnit.Net output
theSession.Logger = new TestOutputMartenLogger(_output);
#endregion
var skipped = await theSession.Query<DeadLetterEvent>().ToListAsync();

skipped.Where(x => x.ProjectionName == "NamedDocuments" && x.ShardName == "All")
Expand Down
71 changes: 71 additions & 0 deletions src/IssueService/Controllers/WriteLatestController.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
using System;
using System.Threading.Tasks;
using Marten;
using Marten.AspNetCore;
using Marten.Events;
using Microsoft.AspNetCore.Mvc;

namespace IssueService.Controllers;

// Simple events for testing
public record OrderPlaced(string Description, decimal Amount);
public record OrderShipped;

// Guid-identified aggregate for inline projection
public class Order
{
public Guid Id { get; set; }
public string Description { get; set; }
public decimal Amount { get; set; }
public bool Shipped { get; set; }

public void Apply(OrderPlaced placed)
{
Description = placed.Description;
Amount = placed.Amount;
}

public void Apply(OrderShipped _) => Shipped = true;
}

// String-identified aggregate for inline projection
public class NamedOrder
{
public string Id { get; set; }
public string Description { get; set; }
public decimal Amount { get; set; }
public bool Shipped { get; set; }

public void Apply(OrderPlaced placed)
{
Description = placed.Description;
Amount = placed.Amount;
}

public void Apply(OrderShipped _) => Shipped = true;
}

public class WriteLatestController: ControllerBase
{
#region sample_write_latest_aggregate_to_httpresponse

[HttpGet("/order/{orderId:guid}")]
public Task GetOrder(Guid orderId, [FromServices] IDocumentSession session)
{
// Streams the raw JSON of the projected aggregate to the HTTP response
// without deserialization/serialization when the projection is stored inline
return session.Events.WriteLatest<Order>(orderId, HttpContext);
}

#endregion

#region sample_write_latest_aggregate_by_string_to_httpresponse

[HttpGet("/named-order/{orderId}")]
public Task GetNamedOrder(string orderId, [FromServices] IDocumentSession session)
{
return session.Events.WriteLatest<NamedOrder>(orderId, HttpContext);
}

#endregion
}
1 change: 1 addition & 0 deletions src/IssueService/MartenSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@ public class MartenSettings
{
public const string SECTION = "Marten";
public string SchemaName { get; set; }
public bool UseStringStreamIdentity { get; set; }
}
#endregion
13 changes: 13 additions & 0 deletions src/IssueService/Startup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using IssueService.Controllers;
using JasperFx.Events;
using Marten;
using Marten.Events.Projections;
using Marten.Testing.Harness;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
Expand Down Expand Up @@ -42,6 +45,16 @@ public void ConfigureServices(IServiceCollection services)
options.DatabaseSchemaName = martenSettings.SchemaName;
}

if (martenSettings.UseStringStreamIdentity)
{
options.Events.StreamIdentity = StreamIdentity.AsString;
options.Projections.Snapshot<NamedOrder>(SnapshotLifecycle.Inline);
}
else
{
options.Projections.Snapshot<Order>(SnapshotLifecycle.Inline);
}

return options;
}).UseLightweightSessions();
#endregion
Expand Down
41 changes: 41 additions & 0 deletions src/Marten.AspNetCore.Testing/StringStreamAppFixture.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
using System;
using System.Threading.Tasks;
using Alba;
using IssueService;
using Microsoft.Extensions.DependencyInjection;
using Xunit;

namespace Marten.AspNetCore.Testing;

public class StringStreamAppFixture: IAsyncLifetime
{
private string SchemaName { get; } = "sch" + Guid.NewGuid().ToString().Replace("-", string.Empty);
public IAlbaHost Host { get; private set; }

public async Task InitializeAsync()
{
Host = await AlbaHost.For<Program>(b =>
{
b.ConfigureServices((context, services) =>
{
services.MartenDaemonModeIsSolo();

services.Configure<MartenSettings>(s =>
{
s.SchemaName = SchemaName;
s.UseStringStreamIdentity = true;
});
});
});
}

public async Task DisposeAsync()
{
await Host.DisposeAsync();
}
}

[CollectionDefinition("string_stream_integration")]
public class StringStreamIntegrationCollection: ICollectionFixture<StringStreamAppFixture>
{
}
Loading
Loading