Skip to content

Commit

Permalink
feat(streams): Use Cursor (#160)
Browse files Browse the repository at this point in the history
  • Loading branch information
cynicaljoy authored Sep 9, 2024
1 parent 4e1d1ef commit b0c855b
Show file tree
Hide file tree
Showing 9 changed files with 250 additions and 38 deletions.
106 changes: 105 additions & 1 deletion Fauna.Test/Integration.Tests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ public async Task CanReadEventsFomStream()
{
Assert.Multiple(() =>
{
Assert.NotZero(evt.TxnTime, "should have a txn time");
Assert.IsNotEmpty(evt.Cursor, "should have a cursor");
Assert.NotZero(evt.Stats.ReadOps, "should have consumed ReadOps");
if (evt.Type is EventType.Status)
{
Expand Down Expand Up @@ -386,4 +386,108 @@ public async Task CanReadEventsFomStream()

await Task.CompletedTask;
}

[Test]
[Category("Streaming")]
public Task StreamThrowsWithBadRequest()
{
var cts = new CancellationTokenSource(TimeSpan.FromMinutes(1)); // prevent runaway test

var ex = Assert.ThrowsAsync<FaunaException>(async () =>
{
var stream = await _client.EventStreamAsync<StreamingSandbox>(FQL($"StreamingSandbox.all().toStream()"),
streamOptions: new StreamOptions("fake", "abc1234=="),
cancellationToken: cts.Token);

await foreach (var _ in stream)
{
Assert.Fail("Should not process events");
}
});

Assert.AreEqual("BadRequest: Bad Request", ex?.Message);

return Task.CompletedTask;
}

[Test]
[Category("Streaming")]
public async Task CanResumeStreamWithStreamOptions()
{
string? token = null;
string? cursor = null;

var queries = new[]
{
FQL($"StreamingSandbox.create({{ foo: 'bar' }})"),
FQL($"StreamingSandbox.all().forEach(.update({{ foo: 'baz' }}))"),
FQL($"StreamingSandbox.all().forEach(.delete())")
};

var cts = new CancellationTokenSource(TimeSpan.FromMinutes(1)); // prevent runaway test
cts.Token.ThrowIfCancellationRequested();

int expectedEvents = queries.Length + 1;

// create a task to open the stream and process events
var streamTask = Task.Run(() =>
{
Assert.DoesNotThrowAsync(async () =>
{
var stream = await _client.EventStreamAsync<StreamingSandbox>(
FQL($"StreamingSandbox.all().toStream()"),
cancellationToken: cts.Token
);
Assert.NotNull(stream);
token = stream.Token;

await foreach (var evt in stream)
{
// break after the first received event
cursor = evt.Cursor;
// ReSharper disable once AccessToModifiedClosure
expectedEvents--;
break;
}
});
}, cts.Token);

// invoke queries on a delay to simulate streaming events
var queryTasks = queries.Select(
(query, index) => Task.Delay((index + 1) * 500, cts.Token)
.ContinueWith(
_ =>
{
Assert.DoesNotThrowAsync(async () => { await _client.QueryAsync(query, cancel: cts.Token); },
"Should successfully invoke query");
}, TaskContinuationOptions.ExecuteSynchronously));

// wait for all tasks
queryTasks = queryTasks.Append(streamTask);
Task.WaitAll(queryTasks.ToArray(), cts.Token);

Assert.NotNull(token, "should have a token");
Assert.NotNull(cursor, "should have a cursor from the first event");

var stream = await _client.EventStreamAsync<StreamingSandbox>(
FQL($"StreamingSandbox.all().toStream()"),
streamOptions: new StreamOptions(token!, cursor!),
cancellationToken: cts.Token
);
Assert.NotNull(stream);

await foreach (var evt in stream)
{
Assert.IsNotEmpty(evt.Cursor, "should have a cursor");
expectedEvents--;
if (expectedEvents > 0)
{
continue;
}

break;
}

Assert.Zero(expectedEvents, "stream handler should process all events");
}
}
2 changes: 1 addition & 1 deletion Fauna/Client.cs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ internal override async IAsyncEnumerator<Event<T>> SubscribeStreamInternal<T>(
cancel))
{
LastSeenTxn = evt.TxnTime;
stream.StartTs = evt.TxnTime;
stream.LastCursor = evt.Cursor;
StatsCollector?.Add(evt.Stats);
yield return evt;
}
Expand Down
82 changes: 55 additions & 27 deletions Fauna/Core/Connection.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
using System.Collections.Concurrent;
using System.Net;
using System.Net.Http.Headers;
using System.Runtime.CompilerServices;
using Fauna.Exceptions;
using Fauna.Mapping;
using Fauna.Serialization;
using Fauna.Types;
using Polly;
using Stream = System.IO.Stream;
Expand Down Expand Up @@ -57,44 +60,62 @@ public async IAsyncEnumerable<Event<T>> OpenStream<T>(
while (!cancellationToken.IsCancellationRequested)
{
var listener = new EventListener<T>();
Task streamTask = _cfg.RetryConfiguration.RetryPolicy.ExecuteAndCaptureAsync(async () =>
{
var streamData = new MemoryStream();
stream.Serialize(streamData);

var response = await _cfg.HttpClient
.SendAsync(
CreateHttpRequest(path, streamData, headers),
HttpCompletionOption.ResponseHeadersRead,
cancellationToken)
.ConfigureAwait(false);
Task<PolicyResult<HttpResponseMessage>> streamTask =
_cfg.RetryConfiguration.RetryPolicy.ExecuteAndCaptureAsync(async () =>
{
var streamData = new MemoryStream();
stream.Serialize(streamData);

await using var streamAsync = await response.Content.ReadAsStreamAsync(cancellationToken);
using var streamReader = new StreamReader(streamAsync);
var response = await _cfg.HttpClient
.SendAsync(
CreateHttpRequest(path, streamData, headers),
HttpCompletionOption.ResponseHeadersRead,
cancellationToken)
.ConfigureAwait(false);

while (!streamReader.EndOfStream && !cancellationToken.IsCancellationRequested)
{
string? line = await streamReader.ReadLineAsync().WaitAsync(cancellationToken);
if (string.IsNullOrWhiteSpace(line))
if (!response.IsSuccessStatusCode)
{
continue;
listener.BreakAndClose();
return response;
}

var evt = Event<T>.From(line, ctx);
stream.StartTs = evt.TxnTime;
listener.Dispatch(evt);
}
await using var streamAsync = await response.Content.ReadAsStreamAsync(cancellationToken);
using var streamReader = new StreamReader(streamAsync);

listener.Close();
return response;
});
while (!streamReader.EndOfStream && !cancellationToken.IsCancellationRequested)
{
string? line = await streamReader.ReadLineAsync().WaitAsync(cancellationToken);
if (string.IsNullOrWhiteSpace(line))
{
continue;
}

var evt = Event<T>.From(line, ctx);
stream.LastCursor = evt.Cursor;
listener.Dispatch(evt);
}

listener.Close();
return response;
});

await foreach (var evt in listener.Events().WithCancellation(cancellationToken))
{
if (evt is null) break;

yield return evt;
}

await streamTask;
if (streamTask.Result.Result.IsSuccessStatusCode)
{
continue;
}

var httpResponse = streamTask.Result.Result;
string body = await httpResponse.Content.ReadAsStringAsync(cancellationToken);

throw ExceptionFactory.FromRawResponse(body, httpResponse);
}
}

Expand All @@ -104,7 +125,7 @@ public async IAsyncEnumerable<Event<T>> OpenStream<T>(
/// <typeparam name="T">The type of event data.</typeparam>
private class EventListener<T> where T : notnull
{
private readonly ConcurrentQueue<Event<T>> _queue = new();
private readonly ConcurrentQueue<Event<T>?> _queue = new();
private readonly SemaphoreSlim _semaphore = new(0);
private bool _closed;

Expand All @@ -114,7 +135,14 @@ public void Dispatch(Event<T> evt)
_semaphore.Release();
}

public async IAsyncEnumerable<Event<T>> Events()
public void BreakAndClose()
{
_queue.Enqueue(null);
_semaphore.Release();
Close();
}

public async IAsyncEnumerable<Event<T>?> Events()
{
while (true)
{
Expand Down
5 changes: 5 additions & 0 deletions Fauna/Core/ResponseFields.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ internal readonly struct ResponseFields
/// </summary>
public const string LastSeenTxnFieldName = "txn_ts";

/// <summary>
/// Field name for the stream cursor of the response.
/// </summary>
public const string CursorFieldName = "cursor";

/// <summary>
/// Field name for static type information in the response.
/// </summary>
Expand Down
2 changes: 2 additions & 0 deletions Fauna/Core/StreamEnumerable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ public class StreamEnumerable<T> where T : notnull
private readonly Stream _stream;
private readonly CancellationToken _cancel;

public string Token => _stream.Token;

internal StreamEnumerable(
BaseClient client,
Stream stream,
Expand Down
42 changes: 42 additions & 0 deletions Fauna/Core/StreamOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
namespace Fauna;

/// <summary>
/// Represents the options when subscribing to Fauna Streams.
/// </summary>
public class StreamOptions
{
/// <summary>
/// Initializes a new instance of the <see cref="StreamOptions"/> class with the specified token and cursor.
/// </summary>
/// <param name="token">The token returned from Fauna when the stream is created.</param>
/// <param name="cursor">The cursor from the stream, must be used with the associated Token. Used to resume the stream.</param>
/// <seealso href="https://docs.fauna.com/fauna/current/reference/streaming/#restart-a-stream"/>
public StreamOptions(string token, string cursor)
{
Token = token;
Cursor = cursor;
}

/// <summary>
/// Initializes a new instance of the <see cref="StreamOptions"/> class with the specified token and start timestamp.
/// </summary>
/// <param name="token">The token returned from Fauna when the stream is created.</param>
/// <param name="startTs">The start timestamp to use for the stream.</param>
public StreamOptions(string token, long startTs)
{
Token = token;
StartTs = startTs;
}

// <summary>Token returned from Fauna when the stream is created.</summary>
/// <see href="https://docs.fauna.com/fauna/current/reference/http/reference/stream/get/"/>
public string? Token { get; }

/// <summary>Cursor from the stream, must be used with the associated Token. Used to resume the stream.</summary>
/// <see href="https://docs.fauna.com/fauna/current/reference/streaming/#restart-a-stream"/>
public string? Cursor { get; }

// <summary>Start timestamp from the stream, must be used with the associated Token. Used to resume the stream.</summary>
/// <see href="https://docs.fauna.com/fauna/current/reference/streaming/#restart-a-stream"/>
public long? StartTs { get; }
}
26 changes: 19 additions & 7 deletions Fauna/IClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -337,15 +337,16 @@ public IAsyncEnumerable<Page<T>> PaginateAsync<T>(
ISerializer elemSerializer,
QueryOptions? queryOptions = null,
CancellationToken cancel = default);

}

/// <summary>
/// The base class for Client and DataContext.
/// </summary>
public abstract class BaseClient : IClient
{
internal BaseClient() { }
internal BaseClient()
{
}

internal abstract MappingContext MappingCtx { get; }

Expand Down Expand Up @@ -520,30 +521,41 @@ internal abstract IAsyncEnumerator<Event<T>> SubscribeStreamInternal<T>(
/// <typeparam name="T">Event Data will be deserialized to this type.</typeparam>
/// <param name="query">The query to create the stream from Fauna.</param>
/// <param name="queryOptions">The options for the query.</param>
/// <param name="streamOptions">The options for the stream.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>A task that represents the asynchronous operation. The task result contains a stream of events.</returns>
public async Task<StreamEnumerable<T>> EventStreamAsync<T>(
Query query,
QueryOptions? queryOptions = null,
StreamOptions? streamOptions = null,
CancellationToken cancellationToken = default) where T : notnull
{
Stream stream = streamOptions?.Token != null
? new Stream(streamOptions.Token) { LastCursor = streamOptions.Cursor, StartTs = streamOptions.StartTs }
: await GetStreamFromQueryAsync(query, queryOptions, cancellationToken);

return new StreamEnumerable<T>(this, stream, cancellationToken);
}

private async Task<Stream> GetStreamFromQueryAsync(
Query query,
QueryOptions? queryOptions,
CancellationToken cancellationToken)
{
var response = await QueryAsync<Stream>(
query,
queryOptions,
cancellationToken);

return new StreamEnumerable<T>(
this,
response.Data,
cancellationToken);
return response.Data;
}

/// <summary>
/// Opens the stream with Fauna and returns an enumerator for the stream events.
/// </summary>
/// <typeparam name="T">Event Data will be deserialized to this type.</typeparam>
/// <param name="stream">The stream to subscribe to.</param>
/// <param name="ctx">Mapping context for stream.</param?
/// <param name="ctx">Mapping context for stream.</param>
/// <param name="cancel">The cancellation token.</param>
/// <returns>An async enumerator of stream events.</returns>
public IAsyncEnumerator<Event<T>> SubscribeStream<T>(
Expand Down
Loading

0 comments on commit b0c855b

Please sign in to comment.