diff --git a/src/Dapr.AspNetCore/CloudEventsMiddleware.cs b/src/Dapr.AspNetCore/CloudEventsMiddleware.cs index b9b64b72c..4a02d40f4 100644 --- a/src/Dapr.AspNetCore/CloudEventsMiddleware.cs +++ b/src/Dapr.AspNetCore/CloudEventsMiddleware.cs @@ -48,28 +48,22 @@ public CloudEventsMiddleware(RequestDelegate next, CloudEventsMiddlewareOptions this.options = options; } - public Task InvokeAsync(HttpContext httpContext) + public async Task InvokeAsync(HttpContext httpContext) { - // This middleware unwraps any requests with a cloud events (JSON) content type - // and replaces the request body + request content type so that it can be read by a - // non-cloud-events-aware piece of code. - // - // This corresponds to cloud events in the *structured* format: - // https://github.com/cloudevents/spec/blob/master/http-transport-binding.md#13-content-modes - // - // For *binary* format, we don't have to do anything - // - // We don't support batching. - // - // The philosophy here is that we don't report an error for things we don't support, because - // that would block someone from implementing their own support for it. We only report an error - // when something we do support isn't correct. if (!MatchesContentType(httpContext, out var charSet)) { - return this.next(httpContext); + await this.next(httpContext); + return; } - return this.ProcessBodyAsync(httpContext, charSet); + try + { + await this.ProcessBodyAsync(httpContext, charSet); + } + catch (OperationCanceledException) when (httpContext.RequestAborted.IsCancellationRequested && httpContext.Response.HasStarted) + { + // Swallow + } } private async Task ProcessBodyAsync(HttpContext httpContext, string charSet) @@ -77,7 +71,7 @@ private async Task ProcessBodyAsync(HttpContext httpContext, string charSet) JsonElement json; if (string.Equals(charSet, Encoding.UTF8.WebName, StringComparison.OrdinalIgnoreCase)) { - json = await JsonSerializer.DeserializeAsync(httpContext.Request.Body); + json = await JsonSerializer.DeserializeAsync(httpContext.Request.Body, cancellationToken: httpContext.RequestAborted); } else { @@ -142,7 +136,7 @@ private async Task ProcessBodyAsync(HttpContext httpContext, string charSet) if (isJson || options.SuppressJsonDecodingOfTextPayloads) { // Rehydrate body from JSON payload - await JsonSerializer.SerializeAsync(body, data); + await JsonSerializer.SerializeAsync(body, data, cancellationToken: httpContext.RequestAborted); } else { diff --git a/test/Dapr.AspNetCore.Test/CloudEventsMiddlewareTest.cs b/test/Dapr.AspNetCore.Test/CloudEventsMiddlewareTest.cs index 539c97a80..26b70c988 100644 --- a/test/Dapr.AspNetCore.Test/CloudEventsMiddlewareTest.cs +++ b/test/Dapr.AspNetCore.Test/CloudEventsMiddlewareTest.cs @@ -20,9 +20,12 @@ namespace Dapr.AspNetCore.Test; using System.Text; using System.Text.Json; using System.Threading.Tasks; +using System.Threading; +using System; using Shouldly; using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.Http; +using Microsoft.AspNetCore.Http.Features; using Xunit; public class CloudEventsMiddlewareTest @@ -478,4 +481,83 @@ private static string ReadBody(Stream stream, Encoding encoding = null) var str = encoding.GetString(bytes); return str; } + + [Fact] + public async Task InvokeAsync_SwallowsCancellation_WhenResponseHasStarted() + { + using var cts = new CancellationTokenSource(); + + var serviceCollection = new ServiceCollection(); + var provider = serviceCollection.BuildServiceProvider(); + + var app = new ApplicationBuilder(provider); + app.UseCloudEvents(); + + // Register terminal middleware BEFORE building the pipeline + app.Run(httpContext => + { + // Simulate that the response has already started by replacing the response feature + httpContext.Features.Set(new TestHttpResponseFeature { HasStarted = true }); + + // Simulate the client disconnecting after the response started + cts.Cancel(); + throw new OperationCanceledException(cts.Token); + }); + + var pipeline = app.Build(); + + var context = new DefaultHttpContext(); + context.Request.ContentType = "application/cloudevents+json"; + context.Request.Body = MakeBody("{ \"data\": { \"name\":\"jimmy\" } }"); + context.RequestAborted = cts.Token; + + // The middleware should catch and swallow the OperationCanceledException + // because the response has already started and the request was aborted + await pipeline.Invoke(context); + } + + [Fact] + public async Task InvokeAsync_PropagatesCancellation_WhenResponseHasNotStarted() + { + using var cts = new CancellationTokenSource(); + + var serviceCollection = new ServiceCollection(); + var provider = serviceCollection.BuildServiceProvider(); + + var app = new ApplicationBuilder(provider); + app.UseCloudEvents(); + + // Register terminal middleware that cancels BEFORE the response starts + app.Run(httpContext => + { + // Response has NOT started (default), client disconnects + cts.Cancel(); + throw new OperationCanceledException(cts.Token); + }); + + var pipeline = app.Build(); + + var context = new DefaultHttpContext(); + context.Request.ContentType = "application/cloudevents+json"; + context.Request.Body = MakeBody("{ \"data\": { \"name\":\"jimmy\" } }"); + context.RequestAborted = cts.Token; + + // The middleware should NOT swallow the exception when the response has not started + await Should.ThrowAsync(() => pipeline.Invoke(context)); + } + + /// + /// A test implementation of that allows controlling + /// the property for unit testing. + /// + private sealed class TestHttpResponseFeature : IHttpResponseFeature + { + public int StatusCode { get; set; } = 200; + public string ReasonPhrase { get; set; } + public IHeaderDictionary Headers { get; set; } = new HeaderDictionary(); + public Stream Body { get; set; } = new MemoryStream(); + public bool HasStarted { get; set; } + public void OnStarting(Func callback, object state) { } + public void OnCompleted(Func callback, object state) { } + } }