Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(streams): Use Cursor #160

Merged
merged 7 commits into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 105 additions & 1 deletion Fauna.Test/Integration.Tests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ public async Task CanReadEventsFomStream()
{
Assert.Multiple(() =>
{
Assert.NotZero(evt.TxnTime, "should have a txn time");
Assert.IsNotEmpty(evt.Cursor, "should have a cursor");
Assert.NotZero(evt.Stats.ReadOps, "should have consumed ReadOps");
if (evt.Type is EventType.Status)
{
Expand Down Expand Up @@ -386,4 +386,108 @@ public async Task CanReadEventsFomStream()

await Task.CompletedTask;
}

[Test]
[Category("Streaming")]
public Task StreamThrowsWithBadRequest()
{
var cts = new CancellationTokenSource(TimeSpan.FromMinutes(1)); // prevent runaway test

var ex = Assert.ThrowsAsync<FaunaException>(async () =>
{
var stream = await _client.EventStreamAsync<StreamingSandbox>(FQL($"StreamingSandbox.all().toStream()"),
streamOptions: new StreamOptions("fake", "abc1234=="),
cancellationToken: cts.Token);

await foreach (var _ in stream)
{
Assert.Fail("Should not process events");
}
});

Assert.AreEqual("BadRequest: Bad Request", ex?.Message);

return Task.CompletedTask;
}

[Test]
[Category("Streaming")]
public async Task CanResumeStreamWithStreamOptions()
{
string? token = null;
string? cursor = null;

var queries = new[]
{
FQL($"StreamingSandbox.create({{ foo: 'bar' }})"),
FQL($"StreamingSandbox.all().forEach(.update({{ foo: 'baz' }}))"),
FQL($"StreamingSandbox.all().forEach(.delete())")
};

var cts = new CancellationTokenSource(TimeSpan.FromMinutes(1)); // prevent runaway test
cts.Token.ThrowIfCancellationRequested();

int expectedEvents = queries.Length + 1;

// create a task to open the stream and process events
var streamTask = Task.Run(() =>
{
Assert.DoesNotThrowAsync(async () =>
{
var stream = await _client.EventStreamAsync<StreamingSandbox>(
FQL($"StreamingSandbox.all().toStream()"),
cancellationToken: cts.Token
);
Assert.NotNull(stream);
token = stream.Token;

await foreach (var evt in stream)
{
// break after the first received event
cursor = evt.Cursor;
// ReSharper disable once AccessToModifiedClosure
expectedEvents--;
break;
}
});
}, cts.Token);

// invoke queries on a delay to simulate streaming events
var queryTasks = queries.Select(
(query, index) => Task.Delay((index + 1) * 500, cts.Token)
.ContinueWith(
_ =>
{
Assert.DoesNotThrowAsync(async () => { await _client.QueryAsync(query, cancel: cts.Token); },
"Should successfully invoke query");
}, TaskContinuationOptions.ExecuteSynchronously));

// wait for all tasks
queryTasks = queryTasks.Append(streamTask);
Task.WaitAll(queryTasks.ToArray(), cts.Token);

Assert.NotNull(token, "should have a token");
Assert.NotNull(cursor, "should have a cursor from the first event");

var stream = await _client.EventStreamAsync<StreamingSandbox>(
FQL($"StreamingSandbox.all().toStream()"),
streamOptions: new StreamOptions(token!, cursor!),
cancellationToken: cts.Token
);
Assert.NotNull(stream);

await foreach (var evt in stream)
{
Assert.IsNotEmpty(evt.Cursor, "should have a cursor");
expectedEvents--;
if (expectedEvents > 0)
{
continue;
}

break;
}

Assert.Zero(expectedEvents, "stream handler should process all events");
}
}
2 changes: 1 addition & 1 deletion Fauna/Client.cs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ internal override async IAsyncEnumerator<Event<T>> SubscribeStreamInternal<T>(
cancel))
{
LastSeenTxn = evt.TxnTime;
stream.StartTs = evt.TxnTime;
stream.LastCursor = evt.Cursor;
StatsCollector?.Add(evt.Stats);
yield return evt;
}
Expand Down
82 changes: 55 additions & 27 deletions Fauna/Core/Connection.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
using System.Collections.Concurrent;
using System.Net;
using System.Net.Http.Headers;
using System.Runtime.CompilerServices;
using Fauna.Exceptions;
using Fauna.Mapping;
using Fauna.Serialization;
using Fauna.Types;
using Polly;
using Stream = System.IO.Stream;
Expand Down Expand Up @@ -57,44 +60,62 @@ public async IAsyncEnumerable<Event<T>> OpenStream<T>(
while (!cancellationToken.IsCancellationRequested)
{
var listener = new EventListener<T>();
Task streamTask = _cfg.RetryConfiguration.RetryPolicy.ExecuteAndCaptureAsync(async () =>
{
var streamData = new MemoryStream();
stream.Serialize(streamData);

var response = await _cfg.HttpClient
.SendAsync(
CreateHttpRequest(path, streamData, headers),
HttpCompletionOption.ResponseHeadersRead,
cancellationToken)
.ConfigureAwait(false);
Task<PolicyResult<HttpResponseMessage>> streamTask =
_cfg.RetryConfiguration.RetryPolicy.ExecuteAndCaptureAsync(async () =>
{
var streamData = new MemoryStream();
stream.Serialize(streamData);

await using var streamAsync = await response.Content.ReadAsStreamAsync(cancellationToken);
using var streamReader = new StreamReader(streamAsync);
var response = await _cfg.HttpClient
.SendAsync(
CreateHttpRequest(path, streamData, headers),
HttpCompletionOption.ResponseHeadersRead,
cancellationToken)
.ConfigureAwait(false);

while (!streamReader.EndOfStream && !cancellationToken.IsCancellationRequested)
{
string? line = await streamReader.ReadLineAsync().WaitAsync(cancellationToken);
if (string.IsNullOrWhiteSpace(line))
if (!response.IsSuccessStatusCode)
{
continue;
listener.BreakAndClose();
return response;
}

var evt = Event<T>.From(line, ctx);
stream.StartTs = evt.TxnTime;
listener.Dispatch(evt);
}
await using var streamAsync = await response.Content.ReadAsStreamAsync(cancellationToken);
using var streamReader = new StreamReader(streamAsync);

listener.Close();
return response;
});
while (!streamReader.EndOfStream && !cancellationToken.IsCancellationRequested)
{
string? line = await streamReader.ReadLineAsync().WaitAsync(cancellationToken);
if (string.IsNullOrWhiteSpace(line))
{
continue;
}

var evt = Event<T>.From(line, ctx);
stream.LastCursor = evt.Cursor;
listener.Dispatch(evt);
}

listener.Close();
return response;
});

await foreach (var evt in listener.Events().WithCancellation(cancellationToken))
{
if (evt is null) break;

yield return evt;
}

await streamTask;
if (streamTask.Result.Result.IsSuccessStatusCode)
{
continue;
}

var httpResponse = streamTask.Result.Result;
string body = await httpResponse.Content.ReadAsStringAsync(cancellationToken);

throw ExceptionFactory.FromRawResponse(body, httpResponse);
}
}

Expand All @@ -104,7 +125,7 @@ public async IAsyncEnumerable<Event<T>> OpenStream<T>(
/// <typeparam name="T">The type of event data.</typeparam>
private class EventListener<T> where T : notnull
{
private readonly ConcurrentQueue<Event<T>> _queue = new();
private readonly ConcurrentQueue<Event<T>?> _queue = new();
private readonly SemaphoreSlim _semaphore = new(0);
private bool _closed;

Expand All @@ -114,7 +135,14 @@ public void Dispatch(Event<T> evt)
_semaphore.Release();
}

public async IAsyncEnumerable<Event<T>> Events()
public void BreakAndClose()
{
_queue.Enqueue(null);
_semaphore.Release();
Close();
}

public async IAsyncEnumerable<Event<T>?> Events()
{
while (true)
{
Expand Down
5 changes: 5 additions & 0 deletions Fauna/Core/ResponseFields.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ internal readonly struct ResponseFields
/// </summary>
public const string LastSeenTxnFieldName = "txn_ts";

/// <summary>
/// Field name for the stream cursor of the response.
/// </summary>
public const string CursorFieldName = "cursor";

/// <summary>
/// Field name for static type information in the response.
/// </summary>
Expand Down
2 changes: 2 additions & 0 deletions Fauna/Core/StreamEnumerable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ public class StreamEnumerable<T> where T : notnull
private readonly Stream _stream;
private readonly CancellationToken _cancel;

public string Token => _stream.Token;

internal StreamEnumerable(
BaseClient client,
Stream stream,
Expand Down
42 changes: 42 additions & 0 deletions Fauna/Core/StreamOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
namespace Fauna;

/// <summary>
/// Represents the options when subscribing to Fauna Streams.
/// </summary>
public class StreamOptions
pnwpedro marked this conversation as resolved.
Show resolved Hide resolved
{
/// <summary>
/// Initializes a new instance of the <see cref="StreamOptions"/> class with the specified token and cursor.
/// </summary>
/// <param name="token">The token returned from Fauna when the stream is created.</param>
/// <param name="cursor">The cursor from the stream, must be used with the associated Token. Used to resume the stream.</param>
/// <seealso href="https://docs.fauna.com/fauna/current/reference/streaming/#restart-a-stream"/>
public StreamOptions(string token, string cursor)
{
Token = token;
Cursor = cursor;
}

/// <summary>
/// Initializes a new instance of the <see cref="StreamOptions"/> class with the specified token and start timestamp.
/// </summary>
/// <param name="token">The token returned from Fauna when the stream is created.</param>
/// <param name="startTs">The start timestamp to use for the stream.</param>
public StreamOptions(string token, long startTs)
{
Token = token;
StartTs = startTs;
}

// <summary>Token returned from Fauna when the stream is created.</summary>
/// <see href="https://docs.fauna.com/fauna/current/reference/http/reference/stream/get/"/>
public string? Token { get; }

/// <summary>Cursor from the stream, must be used with the associated Token. Used to resume the stream.</summary>
/// <see href="https://docs.fauna.com/fauna/current/reference/streaming/#restart-a-stream"/>
public string? Cursor { get; }

// <summary>Start timestamp from the stream, must be used with the associated Token. Used to resume the stream.</summary>
/// <see href="https://docs.fauna.com/fauna/current/reference/streaming/#restart-a-stream"/>
public long? StartTs { get; }
}
26 changes: 19 additions & 7 deletions Fauna/IClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -337,15 +337,16 @@ public IAsyncEnumerable<Page<T>> PaginateAsync<T>(
ISerializer elemSerializer,
QueryOptions? queryOptions = null,
CancellationToken cancel = default);

}

/// <summary>
/// The base class for Client and DataContext.
/// </summary>
public abstract class BaseClient : IClient
{
internal BaseClient() { }
internal BaseClient()
{
}

internal abstract MappingContext MappingCtx { get; }

Expand Down Expand Up @@ -520,30 +521,41 @@ internal abstract IAsyncEnumerator<Event<T>> SubscribeStreamInternal<T>(
/// <typeparam name="T">Event Data will be deserialized to this type.</typeparam>
/// <param name="query">The query to create the stream from Fauna.</param>
/// <param name="queryOptions">The options for the query.</param>
/// <param name="streamOptions">The options for the stream.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>A task that represents the asynchronous operation. The task result contains a stream of events.</returns>
public async Task<StreamEnumerable<T>> EventStreamAsync<T>(
Query query,
QueryOptions? queryOptions = null,
StreamOptions? streamOptions = null,
CancellationToken cancellationToken = default) where T : notnull
{
Stream stream = streamOptions?.Token != null
? new Stream(streamOptions.Token) { LastCursor = streamOptions.Cursor, StartTs = streamOptions.StartTs }
: await GetStreamFromQueryAsync(query, queryOptions, cancellationToken);

return new StreamEnumerable<T>(this, stream, cancellationToken);
}

private async Task<Stream> GetStreamFromQueryAsync(
Query query,
QueryOptions? queryOptions,
CancellationToken cancellationToken)
{
var response = await QueryAsync<Stream>(
query,
queryOptions,
cancellationToken);

return new StreamEnumerable<T>(
this,
response.Data,
cancellationToken);
return response.Data;
}

/// <summary>
/// Opens the stream with Fauna and returns an enumerator for the stream events.
/// </summary>
/// <typeparam name="T">Event Data will be deserialized to this type.</typeparam>
/// <param name="stream">The stream to subscribe to.</param>
/// <param name="ctx">Mapping context for stream.</param?
/// <param name="ctx">Mapping context for stream.</param>
/// <param name="cancel">The cancellation token.</param>
/// <returns>An async enumerator of stream events.</returns>
public IAsyncEnumerator<Event<T>> SubscribeStream<T>(
Expand Down
Loading
Loading