From b0c855be3b86b5083260a8e07900ebdae4dd8c1a Mon Sep 17 00:00:00 2001 From: Darren Cunningham Date: Mon, 9 Sep 2024 11:09:00 -0400 Subject: [PATCH] feat(streams): Use Cursor (#160) --- Fauna.Test/Integration.Tests.cs | 106 +++++++++++++++++++++++++++++++- Fauna/Client.cs | 2 +- Fauna/Core/Connection.cs | 82 ++++++++++++++++-------- Fauna/Core/ResponseFields.cs | 5 ++ Fauna/Core/StreamEnumerable.cs | 2 + Fauna/Core/StreamOptions.cs | 42 +++++++++++++ Fauna/IClient.cs | 26 +++++--- Fauna/Types/Event.cs | 13 ++++ Fauna/Types/Stream.cs | 10 ++- 9 files changed, 250 insertions(+), 38 deletions(-) create mode 100644 Fauna/Core/StreamOptions.cs diff --git a/Fauna.Test/Integration.Tests.cs b/Fauna.Test/Integration.Tests.cs index ccc289b8..9aebfb2a 100644 --- a/Fauna.Test/Integration.Tests.cs +++ b/Fauna.Test/Integration.Tests.cs @@ -341,7 +341,7 @@ public async Task CanReadEventsFomStream() { Assert.Multiple(() => { - Assert.NotZero(evt.TxnTime, "should have a txn time"); + Assert.IsNotEmpty(evt.Cursor, "should have a cursor"); Assert.NotZero(evt.Stats.ReadOps, "should have consumed ReadOps"); if (evt.Type is EventType.Status) { @@ -386,4 +386,108 @@ public async Task CanReadEventsFomStream() await Task.CompletedTask; } + + [Test] + [Category("Streaming")] + public Task StreamThrowsWithBadRequest() + { + var cts = new CancellationTokenSource(TimeSpan.FromMinutes(1)); // prevent runaway test + + var ex = Assert.ThrowsAsync(async () => + { + var stream = await _client.EventStreamAsync(FQL($"StreamingSandbox.all().toStream()"), + streamOptions: new StreamOptions("fake", "abc1234=="), + cancellationToken: cts.Token); + + await foreach (var _ in stream) + { + Assert.Fail("Should not process events"); + } + }); + + Assert.AreEqual("BadRequest: Bad Request", ex?.Message); + + return Task.CompletedTask; + } + + [Test] + [Category("Streaming")] + public async Task CanResumeStreamWithStreamOptions() + { + string? token = null; + string? cursor = null; + + var queries = new[] + { + FQL($"StreamingSandbox.create({{ foo: 'bar' }})"), + FQL($"StreamingSandbox.all().forEach(.update({{ foo: 'baz' }}))"), + FQL($"StreamingSandbox.all().forEach(.delete())") + }; + + var cts = new CancellationTokenSource(TimeSpan.FromMinutes(1)); // prevent runaway test + cts.Token.ThrowIfCancellationRequested(); + + int expectedEvents = queries.Length + 1; + + // create a task to open the stream and process events + var streamTask = Task.Run(() => + { + Assert.DoesNotThrowAsync(async () => + { + var stream = await _client.EventStreamAsync( + FQL($"StreamingSandbox.all().toStream()"), + cancellationToken: cts.Token + ); + Assert.NotNull(stream); + token = stream.Token; + + await foreach (var evt in stream) + { + // break after the first received event + cursor = evt.Cursor; + // ReSharper disable once AccessToModifiedClosure + expectedEvents--; + break; + } + }); + }, cts.Token); + + // invoke queries on a delay to simulate streaming events + var queryTasks = queries.Select( + (query, index) => Task.Delay((index + 1) * 500, cts.Token) + .ContinueWith( + _ => + { + Assert.DoesNotThrowAsync(async () => { await _client.QueryAsync(query, cancel: cts.Token); }, + "Should successfully invoke query"); + }, TaskContinuationOptions.ExecuteSynchronously)); + + // wait for all tasks + queryTasks = queryTasks.Append(streamTask); + Task.WaitAll(queryTasks.ToArray(), cts.Token); + + Assert.NotNull(token, "should have a token"); + Assert.NotNull(cursor, "should have a cursor from the first event"); + + var stream = await _client.EventStreamAsync( + FQL($"StreamingSandbox.all().toStream()"), + streamOptions: new StreamOptions(token!, cursor!), + cancellationToken: cts.Token + ); + Assert.NotNull(stream); + + await foreach (var evt in stream) + { + Assert.IsNotEmpty(evt.Cursor, "should have a cursor"); + expectedEvents--; + if (expectedEvents > 0) + { + continue; + } + + break; + } + + Assert.Zero(expectedEvents, "stream handler should process all events"); + } } diff --git a/Fauna/Client.cs b/Fauna/Client.cs index 84a700aa..2dc406f3 100644 --- a/Fauna/Client.cs +++ b/Fauna/Client.cs @@ -145,7 +145,7 @@ internal override async IAsyncEnumerator> SubscribeStreamInternal( cancel)) { LastSeenTxn = evt.TxnTime; - stream.StartTs = evt.TxnTime; + stream.LastCursor = evt.Cursor; StatsCollector?.Add(evt.Stats); yield return evt; } diff --git a/Fauna/Core/Connection.cs b/Fauna/Core/Connection.cs index 2b0a1721..9c90b753 100644 --- a/Fauna/Core/Connection.cs +++ b/Fauna/Core/Connection.cs @@ -1,7 +1,10 @@ using System.Collections.Concurrent; +using System.Net; using System.Net.Http.Headers; using System.Runtime.CompilerServices; +using Fauna.Exceptions; using Fauna.Mapping; +using Fauna.Serialization; using Fauna.Types; using Polly; using Stream = System.IO.Stream; @@ -57,44 +60,62 @@ public async IAsyncEnumerable> OpenStream( while (!cancellationToken.IsCancellationRequested) { var listener = new EventListener(); - Task streamTask = _cfg.RetryConfiguration.RetryPolicy.ExecuteAndCaptureAsync(async () => - { - var streamData = new MemoryStream(); - stream.Serialize(streamData); - - var response = await _cfg.HttpClient - .SendAsync( - CreateHttpRequest(path, streamData, headers), - HttpCompletionOption.ResponseHeadersRead, - cancellationToken) - .ConfigureAwait(false); + Task> streamTask = + _cfg.RetryConfiguration.RetryPolicy.ExecuteAndCaptureAsync(async () => + { + var streamData = new MemoryStream(); + stream.Serialize(streamData); - await using var streamAsync = await response.Content.ReadAsStreamAsync(cancellationToken); - using var streamReader = new StreamReader(streamAsync); + var response = await _cfg.HttpClient + .SendAsync( + CreateHttpRequest(path, streamData, headers), + HttpCompletionOption.ResponseHeadersRead, + cancellationToken) + .ConfigureAwait(false); - while (!streamReader.EndOfStream && !cancellationToken.IsCancellationRequested) - { - string? line = await streamReader.ReadLineAsync().WaitAsync(cancellationToken); - if (string.IsNullOrWhiteSpace(line)) + if (!response.IsSuccessStatusCode) { - continue; + listener.BreakAndClose(); + return response; } - var evt = Event.From(line, ctx); - stream.StartTs = evt.TxnTime; - listener.Dispatch(evt); - } + await using var streamAsync = await response.Content.ReadAsStreamAsync(cancellationToken); + using var streamReader = new StreamReader(streamAsync); - listener.Close(); - return response; - }); + while (!streamReader.EndOfStream && !cancellationToken.IsCancellationRequested) + { + string? line = await streamReader.ReadLineAsync().WaitAsync(cancellationToken); + if (string.IsNullOrWhiteSpace(line)) + { + continue; + } + + var evt = Event.From(line, ctx); + stream.LastCursor = evt.Cursor; + listener.Dispatch(evt); + } + + listener.Close(); + return response; + }); await foreach (var evt in listener.Events().WithCancellation(cancellationToken)) { + if (evt is null) break; + yield return evt; } await streamTask; + if (streamTask.Result.Result.IsSuccessStatusCode) + { + continue; + } + + var httpResponse = streamTask.Result.Result; + string body = await httpResponse.Content.ReadAsStringAsync(cancellationToken); + + throw ExceptionFactory.FromRawResponse(body, httpResponse); } } @@ -104,7 +125,7 @@ public async IAsyncEnumerable> OpenStream( /// The type of event data. private class EventListener where T : notnull { - private readonly ConcurrentQueue> _queue = new(); + private readonly ConcurrentQueue?> _queue = new(); private readonly SemaphoreSlim _semaphore = new(0); private bool _closed; @@ -114,7 +135,14 @@ public void Dispatch(Event evt) _semaphore.Release(); } - public async IAsyncEnumerable> Events() + public void BreakAndClose() + { + _queue.Enqueue(null); + _semaphore.Release(); + Close(); + } + + public async IAsyncEnumerable?> Events() { while (true) { diff --git a/Fauna/Core/ResponseFields.cs b/Fauna/Core/ResponseFields.cs index 193aa257..1e02f0e9 100644 --- a/Fauna/Core/ResponseFields.cs +++ b/Fauna/Core/ResponseFields.cs @@ -17,6 +17,11 @@ internal readonly struct ResponseFields /// public const string LastSeenTxnFieldName = "txn_ts"; + /// + /// Field name for the stream cursor of the response. + /// + public const string CursorFieldName = "cursor"; + /// /// Field name for static type information in the response. /// diff --git a/Fauna/Core/StreamEnumerable.cs b/Fauna/Core/StreamEnumerable.cs index 3beff9f7..21f46f2b 100644 --- a/Fauna/Core/StreamEnumerable.cs +++ b/Fauna/Core/StreamEnumerable.cs @@ -9,6 +9,8 @@ public class StreamEnumerable where T : notnull private readonly Stream _stream; private readonly CancellationToken _cancel; + public string Token => _stream.Token; + internal StreamEnumerable( BaseClient client, Stream stream, diff --git a/Fauna/Core/StreamOptions.cs b/Fauna/Core/StreamOptions.cs new file mode 100644 index 00000000..f43cf348 --- /dev/null +++ b/Fauna/Core/StreamOptions.cs @@ -0,0 +1,42 @@ +namespace Fauna; + +/// +/// Represents the options when subscribing to Fauna Streams. +/// +public class StreamOptions +{ + /// + /// Initializes a new instance of the class with the specified token and cursor. + /// + /// The token returned from Fauna when the stream is created. + /// The cursor from the stream, must be used with the associated Token. Used to resume the stream. + /// + public StreamOptions(string token, string cursor) + { + Token = token; + Cursor = cursor; + } + + /// + /// Initializes a new instance of the class with the specified token and start timestamp. + /// + /// The token returned from Fauna when the stream is created. + /// The start timestamp to use for the stream. + public StreamOptions(string token, long startTs) + { + Token = token; + StartTs = startTs; + } + + // Token returned from Fauna when the stream is created. + /// + public string? Token { get; } + + /// Cursor from the stream, must be used with the associated Token. Used to resume the stream. + /// + public string? Cursor { get; } + + // Start timestamp from the stream, must be used with the associated Token. Used to resume the stream. + /// + public long? StartTs { get; } +} diff --git a/Fauna/IClient.cs b/Fauna/IClient.cs index 8c7f90c6..adb243b3 100644 --- a/Fauna/IClient.cs +++ b/Fauna/IClient.cs @@ -337,7 +337,6 @@ public IAsyncEnumerable> PaginateAsync( ISerializer elemSerializer, QueryOptions? queryOptions = null, CancellationToken cancel = default); - } /// @@ -345,7 +344,9 @@ public IAsyncEnumerable> PaginateAsync( /// public abstract class BaseClient : IClient { - internal BaseClient() { } + internal BaseClient() + { + } internal abstract MappingContext MappingCtx { get; } @@ -520,22 +521,33 @@ internal abstract IAsyncEnumerator> SubscribeStreamInternal( /// Event Data will be deserialized to this type. /// The query to create the stream from Fauna. /// The options for the query. + /// The options for the stream. /// The cancellation token. /// A task that represents the asynchronous operation. The task result contains a stream of events. public async Task> EventStreamAsync( Query query, QueryOptions? queryOptions = null, + StreamOptions? streamOptions = null, CancellationToken cancellationToken = default) where T : notnull + { + Stream stream = streamOptions?.Token != null + ? new Stream(streamOptions.Token) { LastCursor = streamOptions.Cursor, StartTs = streamOptions.StartTs } + : await GetStreamFromQueryAsync(query, queryOptions, cancellationToken); + + return new StreamEnumerable(this, stream, cancellationToken); + } + + private async Task GetStreamFromQueryAsync( + Query query, + QueryOptions? queryOptions, + CancellationToken cancellationToken) { var response = await QueryAsync( query, queryOptions, cancellationToken); - return new StreamEnumerable( - this, - response.Data, - cancellationToken); + return response.Data; } /// @@ -543,7 +555,7 @@ public async Task> EventStreamAsync( /// /// Event Data will be deserialized to this type. /// The stream to subscribe to. - /// Mapping context for stream.Mapping context for stream. /// The cancellation token. /// An async enumerator of stream events. public IAsyncEnumerator> SubscribeStream( diff --git a/Fauna/Types/Event.cs b/Fauna/Types/Event.cs index 966c7d1b..258aa478 100644 --- a/Fauna/Types/Event.cs +++ b/Fauna/Types/Event.cs @@ -20,6 +20,7 @@ public class Event where T : notnull { public EventType Type { get; private init; } public long TxnTime { get; private init; } + public string Cursor { get; private init; } = null!; public T? Data { get; private init; } public QueryStats Stats { get; private init; } @@ -36,6 +37,7 @@ public static Event From(string body, MappingContext ctx) var evt = new Event { TxnTime = GetTxnTime(json), + Cursor = GetCursor(json), Type = GetType(json), Stats = GetStats(json), Data = GetData(json, ctx), @@ -54,6 +56,17 @@ private static long GetTxnTime(JsonElement json) return elem.TryGetInt64(out long i) ? i : default; } + private static string GetCursor(JsonElement json) + { + if (!json.TryGetProperty(CursorFieldName, out var elem)) + { + throw new InvalidDataException($"Missing required field: cursor - {json.ToString()}"); + } + + return elem.Deserialize()!; + } + + private static EventType GetType(JsonElement json) { if (!json.TryGetProperty("type", out var elem)) diff --git a/Fauna/Types/Stream.cs b/Fauna/Types/Stream.cs index bc790697..8e6521ab 100644 --- a/Fauna/Types/Stream.cs +++ b/Fauna/Types/Stream.cs @@ -15,16 +15,22 @@ public Stream(string token) /// /// Gets the string value of the stream token. /// - private string Token { get; } + internal string Token { get; } public long? StartTs { get; set; } + public string? LastCursor { get; set; } + public void Serialize(System.IO.Stream stream) { var writer = new Utf8JsonWriter(stream); writer.WriteStartObject(); writer.WriteString("token", Token); - if (StartTs != null) + if (LastCursor != null) + { + writer.WriteString("cursor", LastCursor); + } + else if (StartTs != null) { writer.WriteNumber("start_ts", StartTs.Value); }