Skip to content

Commit

Permalink
Expose pre-state Version etc via TransactEx (#263)
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Feb 24, 2021
1 parent 4ed567a commit 1e24745
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 21 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ The `Unreleased` section name is replaced by the expected version of next releas

### Added
### Changed

- Replace `Stream.TransactAsyncEx` with `TransactEx`, providing an extended signature to expose the `ISyncContext` to `decide` as `master` [#263](https://github.com/jet/equinox/pull/263) [#272](https://github.com/jet/equinox/pull/272)

### Removed
### Fixed

Expand Down
12 changes: 6 additions & 6 deletions src/Equinox/Equinox.fs
Original file line number Diff line number Diff line change
Expand Up @@ -23,33 +23,33 @@ type Stream<'event, 'state>
/// 1a. (if events yielded) Attempt to sync the yielded events events to the stream
/// 1b. Tries up to <c>maxAttempts</c> times in the case of a conflict, throwing <c>MaxResyncsExhaustedException</c> to signal failure.
member __.Transact(interpret : 'state -> 'event list) : Async<unit> =
transact (fun state -> async { return (), interpret state }) (fun () _context -> ())
transact (fun context -> async { return (), interpret context.State }) (fun () _context -> ())

/// 0. Invoke the supplied <c>decide</c> function with the present state, holding the <c>'result</c>
/// 1a. (if events yielded) Attempt to sync the yielded events events to the stream
/// 1b. Tries up to <c>maxAttempts</c> times in the case of a conflict, throwing <c>MaxResyncsExhaustedException</c> to signal failure.
/// 2. Yield result
member __.Transact(decide : 'state -> 'result * 'event list) : Async<'result> =
transact (fun state -> async { return decide state }) (fun result _context -> result)
transact (fun context -> async { return decide context.State }) (fun result _context -> result)

/// 0. Invoke the supplied <c>_Async_</c> <c>decide</c> function with the present state, holding the <c>'result</c>
/// 1a. (if events yielded) Attempt to sync the yielded events events to the stream
/// 1b. Tries up to <c>maxAttempts</c> times in the case of a conflict, throwing <c>MaxResyncsExhaustedException</c> to signal failure.
/// 2. Yield result
member __.TransactAsync(decide : 'state -> Async<'result * 'event list>) : Async<'result> =
transact decide (fun result _context -> result)
transact (fun context -> decide context.State) (fun result _context -> result)

/// 0. Invoke the supplied <c>_Async_</c> <c>decide</c> function with the present state, holding the <c>'result</c>
/// 0. Invoke the supplied <c>_Async_</c> <c>decide</c> function with the present state (including extended context), holding the <c>'result</c>
/// 1a. (if events yielded) Attempt to sync the yielded events events to the stream
/// 1b. Tries up to <c>maxAttempts</c> times in the case of a conflict, throwing <c>MaxResyncsExhaustedException</c> to signal failure.
/// 2. Uses <c>mapResult</c> to render the final outcome from the <c>'result</c> and/or the final <c>ISyncContext</c>
/// 3. Yields the outcome
member __.TransactAsyncEx(decide : 'state -> Async<'result * 'event list>, mapResult : 'result -> ISyncContext<'state> -> 'resultEx) : Async<'resultEx> =
member __.TransactEx(decide : ISyncContext<'state> -> Async<'result * 'event list>, mapResult : 'result -> ISyncContext<'state> -> 'view) : Async<'view> =
transact decide mapResult

/// Project from the folded <c>'state</c>, without executing a decision flow as <c>Transact</c> does
member __.Query(projection : 'state -> 'view) : Async<'view> =
Flow.query (stream, log, fun syncState -> projection (syncState :> ISyncContext<'state>).State)
Flow.query (stream, log, fun context -> projection (context :> ISyncContext<'state>).State)

/// Project from the stream's <c>'state<c> (including extended context), without executing a decision flow as <c>Transact<c> does
member __.QueryEx(projection : ISyncContext<'state> -> 'view) : Async<'view> =
Expand Down
30 changes: 15 additions & 15 deletions src/Equinox/Flow.fs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type ISyncContext<'state> =
module internal Flow =

/// Represents stream and folding state between the load and run/render phases
type SyncState<'event, 'state>
type SyncContext<'event, 'state>
( originState : StreamToken * 'state,
trySync : ILogger * StreamToken * 'state * 'event list -> Async<SyncResult<'state>>) =
let mutable tokenAndState = originState
Expand Down Expand Up @@ -77,45 +77,45 @@ module internal Flow =
/// 2b. if saved without conflict, exit with updated state
/// 2b. if conflicting changes, retry by recommencing at step 1 with the updated state
let run (log : ILogger) (maxSyncAttempts : int, resyncRetryPolicy, createMaxAttemptsExhaustedException)
(syncState : SyncState<'event, 'state>)
(decide : 'state -> Async<'result * 'event list>)
(mapResult : 'result -> SyncState<'event, 'state> -> 'resultEx)
: Async<'resultEx> =
(context : SyncContext<'event, 'state>)
(decide : ISyncContext<'state> -> Async<'result * 'event list>)
(mapResult : 'result -> SyncContext<'event, 'state> -> 'view)
: Async<'view> =

if maxSyncAttempts < 1 then raise <| System.ArgumentOutOfRangeException("maxSyncAttempts", maxSyncAttempts, "should be >= 1")

/// Run a decision cycle - decide what events should be appended given the presented state
let rec loop attempt : Async<'resultEx> = async {
let rec loop attempt : Async<'view> = async {
let log = if attempt = 1 then log else log.ForContext("syncAttempt", attempt)
let! result, events = decide (syncState :> ISyncContext<'state>).State
let! result, events = decide (context :> ISyncContext<'state>)
if List.isEmpty events then
log.Debug "No events generated"
return mapResult result syncState
return mapResult result context
elif attempt = maxSyncAttempts then
// Special case: on final attempt, we won't be `resync`ing; we're giving up
let! committed = syncState.TryWithoutResync(log, events)
let! committed = context.TryWithoutResync(log, events)
if not committed then
log.Debug "Max Sync Attempts exceeded"
return raise (createMaxAttemptsExhaustedException attempt)
else
return mapResult result syncState
return mapResult result context
else
let! committed = syncState.TryOrResync(resyncRetryPolicy, attempt, log, events)
let! committed = context.TryOrResync(resyncRetryPolicy, attempt, log, events)
if not committed then
log.Debug "Resyncing and retrying"
return! loop (attempt + 1)
else
return mapResult result syncState }
return mapResult result context }

/// Commence, processing based on the incoming state
loop 1

let transact (maxAttempts, resyncRetryPolicy, createMaxAttemptsExhaustedException) (stream : IStream<_, _>, log) decide mapResult : Async<'result> = async {
let! streamState = stream.Load log
let syncState = SyncState(streamState, stream.TrySync)
let syncState = SyncContext(streamState, stream.TrySync)
return! run log (maxAttempts, resyncRetryPolicy, createMaxAttemptsExhaustedException) syncState decide mapResult }

let query (stream : IStream<'event, 'state>, log : ILogger, project: SyncState<'event, 'state> -> 'result) : Async<'result> = async {
let query (stream : IStream<'event, 'state>, log : ILogger, project: SyncContext<'event, 'state> -> 'result) : Async<'result> = async {
let! streamState = stream.Load log
let syncState = SyncState(streamState, stream.TrySync)
let syncState = SyncContext(streamState, stream.TrySync)
return project syncState }

0 comments on commit 1e24745

Please sign in to comment.