Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@
<PackageVersion Include="Grpc.Tools" Version="2.72.0" />
<PackageVersion Include="HtmlTags" Version="9.0.0" />
<PackageVersion Include="JasperFx" Version="1.24.1" />
<PackageVersion Include="JasperFx.Events" Version="1.27.0" />
<PackageVersion Include="JasperFx.Events" Version="1.28.1" />
<PackageVersion Include="JasperFx.RuntimeCompiler" Version="4.4.0" />
<PackageVersion Include="JasperFx.SourceGeneration" Version="1.1.0" />
<PackageVersion Include="Lamar.Microsoft.DependencyInjection" Version="15.0.1" />
<PackageVersion Include="Marten" Version="8.28.0" />
<PackageVersion Include="Marten" Version="8.31.0" />
<PackageVersion Include="Microsoft.Data.SqlClient" Version="6.1.3" />
<PackageVersion Include="Polecat" Version="1.6.1" />
<PackageVersion Include="Microsoft.Azure.Cosmos" Version="3.46.1" />
<PackageVersion Include="Marten.AspNetCore" Version="8.28.0" />
<PackageVersion Include="Marten.AspNetCore" Version="8.31.0" />
<PackageVersion Include="MemoryPack" Version="1.21.3" />
<PackageVersion Include="MessagePack" Version="3.1.3" />
<PackageVersion Include="Meziantou.Extensions.Logging.Xunit" Version="1.0.15" />
Expand Down
97 changes: 97 additions & 0 deletions docs/guide/http/marten.md
Original file line number Diff line number Diff line change
Expand Up @@ -565,3 +565,100 @@ public class ApprovedInvoicedCompiledQuery : ICompiledListQuery<Invoice>
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Http/WolverineWebApi/Marten/Documents.cs#L109-L119' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_compiled_query_return_query' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

## Streaming JSON Responses <Badge type="tip" text="5.32" />

[`Marten.AspNetCore`](https://martendb.io/documents/aspnetcore.html) ships three
typed return values — `StreamOne<T>`, `StreamMany<T>`, and `StreamAggregate<T>` —
that write Marten's raw JSON directly to the HTTP response. The JSON never
round-trips through a .NET object and the framework's serializer, so there's no
deserialize/serialize overhead.

Each type also supplies correct OpenAPI metadata (`Produces<T>`, `Produces(404)`
where appropriate) via `IEndpointMetadataProvider`, so Swashbuckle, NSwag, and
Minimal-API's built-in OpenAPI generator all see the right response shape.

The types implement `IResult`, so Wolverine.Http dispatches them through its
existing `ResultWriterPolicy` — **no extra Wolverine-specific configuration is
needed**. Just `using Marten.AspNetCore;` in your endpoint file and return one.

### When to use which

| Type | Source | Shape returned | 404? |
| ---------------------- | ------------------------------------------------ | -------------- | ---- |
| `StreamOne<T>` | `IQueryable<T>` — regular Marten document query | Single `T` | yes |
| `StreamMany<T>` | `IQueryable<T>` — regular Marten document query | JSON array `T[]` | no (empty array = 200) |
| `StreamAggregate<T>` | `IDocumentSession` + stream id — event-sourced | Single `T` | yes |

**Key difference — `StreamOne<T>` vs `StreamAggregate<T>`**:

- **`StreamOne<T>`** is for regular Marten documents — plain objects persisted via
`session.Store()` and queried with `session.Query<T>()`. The query hits the
document table directly.
- **`StreamAggregate<T>`** is for event-sourced aggregates. Marten rebuilds the
latest aggregate state by folding events from the event store (or reads a
projected snapshot if you have one configured). Use this when `T` is an
event-sourced aggregate, not a stored document.

### `StreamOne<T>` — single document with 404 on miss

```csharp
using Marten.AspNetCore;

[WolverineGet("/invoices/{id}")]
public static StreamOne<Invoice> Get(Guid id, IQuerySession session)
=> new(session.Query<Invoice>().Where(x => x.Id == id));
```

Returns `200 application/json` with the JSON body on a hit, `404` on a miss.
`Content-Length` and `Content-Type` are set automatically.

### `StreamMany<T>` — JSON array

```csharp
[WolverineGet("/invoices/approved")]
public static StreamMany<Invoice> Approved(IQuerySession session)
=> new(session.Query<Invoice>().Where(x => x.Approved));
```

Returns `200 application/json` with a JSON array body. An empty result set
returns `[]`, not `404`.

### `StreamAggregate<T>` — event-sourced aggregate (latest)

```csharp
[WolverineGet("/orders/{id}")]
public static StreamAggregate<Order> Get(Guid id, IDocumentSession session)
=> new(session, id);
```

Returns `200 application/json` with the JSON of the latest projected aggregate
state, or `404` if no stream exists for the supplied id. The constructor also
accepts `string` ids for stores configured with string-keyed streams.

### Customizing status code and content type

All three types expose init-only properties for overriding defaults:

```csharp
[WolverinePost("/invoices")]
public static StreamOne<Invoice> Create(CreateInvoice cmd, IQuerySession session)
=> new(session.Query<Invoice>().Where(x => x.Id == cmd.InvoiceId))
{
OnFoundStatus = StatusCodes.Status201Created,
ContentType = "application/vnd.myapi.invoice+json"
};
```

### When to prefer streaming over returning `T`

Reach for these types when:

- The response is large (big documents, long arrays) — avoids allocating the
deserialized graph and re-serializing it
- You need fine-grained control over status code and content type without
wrapping in `IResult`
- You want a concise, typed endpoint signature that still produces accurate
OpenAPI metadata

For small responses where the query result is already going to be materialized
(to make a decision, for example), a plain `T` return is fine.
217 changes: 217 additions & 0 deletions src/Http/Wolverine.Http.Tests/Marten/streaming_endpoints.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
using Alba;
using Marten;
using Marten.AspNetCore;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Http.Metadata;
using Microsoft.AspNetCore.Routing;
using Microsoft.Extensions.DependencyInjection;
using Shouldly;
using WolverineWebApi.Marten;

namespace Wolverine.Http.Tests.Marten;

/// <summary>
/// Tests for <see cref="StreamOne{T}"/>, <see cref="StreamMany{T}"/>, and
/// <see cref="StreamAggregate{T}"/> from <c>Marten.AspNetCore</c>. Wolverine.Http
/// dispatches these as ordinary <c>IResult</c> return values via the existing
/// <c>ResultWriterPolicy</c> — no Wolverine-specific code required. GH-1562.
/// </summary>
public class streaming_endpoints(AppFixture fixture) : IntegrationContext(fixture)
{
// ───────────────────────── StreamOne<T> ─────────────────────────

[Fact]
public async Task stream_one_returns_matching_document_as_json()
{
var invoice = new Invoice { Id = Guid.NewGuid(), Approved = true };
await using (var session = Store.LightweightSession())
{
session.Store(invoice);
await session.SaveChangesAsync();
}

var body = await Host.GetAsJson<Invoice>($"/streaming/invoice/{invoice.Id}");

body.ShouldNotBeNull();
body.Id.ShouldBe(invoice.Id);
body.Approved.ShouldBeTrue();
}

[Fact]
public async Task stream_one_sets_content_type_and_status_on_hit()
{
var invoice = new Invoice { Id = Guid.NewGuid() };
await using (var session = Store.LightweightSession())
{
session.Store(invoice);
await session.SaveChangesAsync();
}

var result = await Host.Scenario(x =>
{
x.Get.Url($"/streaming/invoice/{invoice.Id}");
x.StatusCodeShouldBe(200);
x.ContentTypeShouldBe("application/json");
});

// Content-Length should be set (single-document streaming buffers in memory)
result.Context.Response.ContentLength.HasValue.ShouldBeTrue();
}

[Fact]
public async Task stream_one_returns_404_when_no_match()
{
await Host.Scenario(x =>
{
x.Get.Url($"/streaming/invoice/{Guid.NewGuid()}");
x.StatusCodeShouldBe(404);
});
}

[Fact]
public async Task stream_one_respects_custom_on_found_status()
{
var invoice = new Invoice { Id = Guid.NewGuid() };
await using (var session = Store.LightweightSession())
{
session.Store(invoice);
await session.SaveChangesAsync();
}

await Host.Scenario(x =>
{
x.Get.Url($"/streaming/invoice/{invoice.Id}/custom-status");
x.StatusCodeShouldBe(202);
});
}

[Fact]
public async Task stream_one_respects_custom_content_type()
{
var invoice = new Invoice { Id = Guid.NewGuid() };
await using (var session = Store.LightweightSession())
{
session.Store(invoice);
await session.SaveChangesAsync();
}

await Host.Scenario(x =>
{
x.Get.Url($"/streaming/invoice/{invoice.Id}/custom-content-type");
x.StatusCodeShouldBe(200);
x.ContentTypeShouldBe("application/vnd.wolverine.invoice+json");
});
}

// ───────────────────────── StreamMany<T> ─────────────────────────

[Fact]
public async Task stream_many_returns_json_array()
{
// Use a distinct Approved=true batch keyed by this test's invoices so assertions
// are stable even across concurrent test runs against a shared document store.
var ids = new[] { Guid.NewGuid(), Guid.NewGuid(), Guid.NewGuid() };
await using (var session = Store.LightweightSession())
{
foreach (var id in ids) session.Store(new Invoice { Id = id, Approved = true });
await session.SaveChangesAsync();
}

var body = await Host.GetAsJson<List<Invoice>>("/streaming/invoices/approved");

body.ShouldNotBeNull();
foreach (var id in ids)
body.ShouldContain(x => x.Id == id);
}

[Fact]
public async Task stream_many_returns_empty_array_when_no_match_not_404()
{
var result = await Host.Scenario(x =>
{
x.Get.Url("/streaming/invoices/none");
x.StatusCodeShouldBe(200);
x.ContentTypeShouldBe("application/json");
});

var body = result.ReadAsText();
body.Trim().ShouldBe("[]");
}

// ───────────────────── StreamAggregate<T> ─────────────────────

[Fact]
public async Task stream_aggregate_returns_latest_aggregate_as_json()
{
// Use the existing /orders/create endpoint to set up an event-sourced Order
var created = await Host.Scenario(x =>
{
x.Post.Json(new StartOrder(["Socks", "Shoes"])).ToUrl("/orders/create");
});

var status = created.ReadAsJson<OrderStatus>();

var body = await Host.GetAsJson<Order>($"/streaming/order/{status.OrderId}");

body.ShouldNotBeNull();
body.Id.ShouldBe(status.OrderId);
body.Items.Keys.OrderBy(x => x).ShouldBe(new[] { "Shoes", "Socks" });
}

[Fact]
public async Task stream_aggregate_returns_404_for_unknown_id()
{
await Host.Scenario(x =>
{
x.Get.Url($"/streaming/order/{Guid.NewGuid()}");
x.StatusCodeShouldBe(404);
});
}

// ───────────────────────── OpenAPI metadata ─────────────────────────

[Fact]
public void stream_one_endpoint_advertises_produces_T_and_404_in_metadata()
{
var metadata = EndpointMetadataFor("GET", "/streaming/invoice/{id}");

metadata.OfType<IProducesResponseTypeMetadata>()
.ShouldContain(m => m.StatusCode == 200 && m.Type == typeof(Invoice));

metadata.OfType<IProducesResponseTypeMetadata>()
.ShouldContain(m => m.StatusCode == 404);
}

[Fact]
public void stream_many_endpoint_advertises_produces_array_in_metadata()
{
var metadata = EndpointMetadataFor("GET", "/streaming/invoices/approved");

metadata.OfType<IProducesResponseTypeMetadata>()
.ShouldContain(m => m.StatusCode == 200 && m.Type == typeof(IReadOnlyList<Invoice>));
}

[Fact]
public void stream_aggregate_endpoint_advertises_produces_T_and_404_in_metadata()
{
var metadata = EndpointMetadataFor("GET", "/streaming/order/{id}");

metadata.OfType<IProducesResponseTypeMetadata>()
.ShouldContain(m => m.StatusCode == 200 && m.Type == typeof(Order));
metadata.OfType<IProducesResponseTypeMetadata>()
.ShouldContain(m => m.StatusCode == 404);
}

private EndpointMetadataCollection EndpointMetadataFor(string method, string pattern)
{
var dataSource = Host.Services.GetServices<EndpointDataSource>()
.SelectMany(x => x.Endpoints)
.OfType<RouteEndpoint>()
.FirstOrDefault(x =>
x.RoutePattern.RawText == pattern &&
x.Metadata.GetMetadata<HttpMethodMetadata>()!.HttpMethods.Contains(method));

dataSource.ShouldNotBeNull($"No endpoint found for {method} {pattern}");
return dataSource.Metadata;
}
}
51 changes: 51 additions & 0 deletions src/Http/WolverineWebApi/Marten/StreamingEndpoints.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
using Marten;
using Marten.AspNetCore;
using Wolverine.Http;

namespace WolverineWebApi.Marten;

/// <summary>
/// Endpoints exercising the <see cref="StreamOne{T}"/>, <see cref="StreamMany{T}"/>,
/// and <see cref="StreamAggregate{T}"/> helpers from <c>Marten.AspNetCore</c>.
/// Used by the streaming_endpoints tests for GH-1562. Wolverine.Http dispatches
/// these as ordinary <c>IResult</c> return values via the existing
/// <c>ResultWriterPolicy</c> — no Wolverine-specific code needed.
/// </summary>
public static class StreamingEndpoints
{
// StreamOne - single document, 404 if not found
[WolverineGet("/streaming/invoice/{id}")]
public static StreamOne<Invoice> GetOne(Guid id, IQuerySession session)
=> new(session.Query<Invoice>().Where(x => x.Id == id));

// StreamOne with custom OnFoundStatus
[WolverineGet("/streaming/invoice/{id}/custom-status")]
public static StreamOne<Invoice> GetOneCreated(Guid id, IQuerySession session)
=> new(session.Query<Invoice>().Where(x => x.Id == id))
{
OnFoundStatus = StatusCodes.Status202Accepted
};

// StreamOne with custom content type
[WolverineGet("/streaming/invoice/{id}/custom-content-type")]
public static StreamOne<Invoice> GetOneVendorType(Guid id, IQuerySession session)
=> new(session.Query<Invoice>().Where(x => x.Id == id))
{
ContentType = "application/vnd.wolverine.invoice+json"
};

// StreamMany - JSON array
[WolverineGet("/streaming/invoices/approved")]
public static StreamMany<Invoice> GetApproved(IQuerySession session)
=> new(session.Query<Invoice>().Where(x => x.Approved));

// StreamMany with no matches - returns empty array, not 404
[WolverineGet("/streaming/invoices/none")]
public static StreamMany<Invoice> GetNone(IQuerySession session)
=> new(session.Query<Invoice>().Where(x => x.Id == Guid.Empty));

// StreamAggregate - event-sourced aggregate, latest state
[WolverineGet("/streaming/order/{id}")]
public static StreamAggregate<Order> GetOrder(Guid id, IDocumentSession session)
=> new(session, id);
}
Loading