diff --git a/CHANGELOG.md b/CHANGELOG.md index 1e6039a49..08fa600a8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,9 @@ The `Unreleased` section name is replaced by the expected version of next releas ### Added ### Changed + +- Replace `Stream.TransactAsyncEx` with `TransactEx`, providing an extended signature to expose the `ISyncContext` to `decide` as `master` [#263](https://github.com/jet/equinox/pull/263) [#272](https://github.com/jet/equinox/pull/272) + ### Removed ### Fixed diff --git a/src/Equinox/Equinox.fs b/src/Equinox/Equinox.fs index 400e2a1a2..2e04b864c 100755 --- a/src/Equinox/Equinox.fs +++ b/src/Equinox/Equinox.fs @@ -23,33 +23,33 @@ type Stream<'event, 'state> /// 1a. (if events yielded) Attempt to sync the yielded events events to the stream /// 1b. Tries up to maxAttempts times in the case of a conflict, throwing MaxResyncsExhaustedException to signal failure. member __.Transact(interpret : 'state -> 'event list) : Async = - transact (fun state -> async { return (), interpret state }) (fun () _context -> ()) + transact (fun context -> async { return (), interpret context.State }) (fun () _context -> ()) /// 0. Invoke the supplied decide function with the present state, holding the 'result /// 1a. (if events yielded) Attempt to sync the yielded events events to the stream /// 1b. Tries up to maxAttempts times in the case of a conflict, throwing MaxResyncsExhaustedException to signal failure. /// 2. Yield result member __.Transact(decide : 'state -> 'result * 'event list) : Async<'result> = - transact (fun state -> async { return decide state }) (fun result _context -> result) + transact (fun context -> async { return decide context.State }) (fun result _context -> result) /// 0. Invoke the supplied _Async_ decide function with the present state, holding the 'result /// 1a. (if events yielded) Attempt to sync the yielded events events to the stream /// 1b. Tries up to maxAttempts times in the case of a conflict, throwing MaxResyncsExhaustedException to signal failure. /// 2. Yield result member __.TransactAsync(decide : 'state -> Async<'result * 'event list>) : Async<'result> = - transact decide (fun result _context -> result) + transact (fun context -> decide context.State) (fun result _context -> result) - /// 0. Invoke the supplied _Async_ decide function with the present state, holding the 'result + /// 0. Invoke the supplied _Async_ decide function with the present state (including extended context), holding the 'result /// 1a. (if events yielded) Attempt to sync the yielded events events to the stream /// 1b. Tries up to maxAttempts times in the case of a conflict, throwing MaxResyncsExhaustedException to signal failure. /// 2. Uses mapResult to render the final outcome from the 'result and/or the final ISyncContext /// 3. Yields the outcome - member __.TransactAsyncEx(decide : 'state -> Async<'result * 'event list>, mapResult : 'result -> ISyncContext<'state> -> 'resultEx) : Async<'resultEx> = + member __.TransactEx(decide : ISyncContext<'state> -> Async<'result * 'event list>, mapResult : 'result -> ISyncContext<'state> -> 'view) : Async<'view> = transact decide mapResult /// Project from the folded 'state, without executing a decision flow as Transact does member __.Query(projection : 'state -> 'view) : Async<'view> = - Flow.query (stream, log, fun syncState -> projection (syncState :> ISyncContext<'state>).State) + Flow.query (stream, log, fun context -> projection (context :> ISyncContext<'state>).State) /// Project from the stream's 'state (including extended context), without executing a decision flow as Transact does member __.QueryEx(projection : ISyncContext<'state> -> 'view) : Async<'view> = diff --git a/src/Equinox/Flow.fs b/src/Equinox/Flow.fs index 93bc1af0c..df774ab87 100755 --- a/src/Equinox/Flow.fs +++ b/src/Equinox/Flow.fs @@ -43,7 +43,7 @@ type ISyncContext<'state> = module internal Flow = /// Represents stream and folding state between the load and run/render phases - type SyncState<'event, 'state> + type SyncContext<'event, 'state> ( originState : StreamToken * 'state, trySync : ILogger * StreamToken * 'state * 'event list -> Async>) = let mutable tokenAndState = originState @@ -77,45 +77,45 @@ module internal Flow = /// 2b. if saved without conflict, exit with updated state /// 2b. if conflicting changes, retry by recommencing at step 1 with the updated state let run (log : ILogger) (maxSyncAttempts : int, resyncRetryPolicy, createMaxAttemptsExhaustedException) - (syncState : SyncState<'event, 'state>) - (decide : 'state -> Async<'result * 'event list>) - (mapResult : 'result -> SyncState<'event, 'state> -> 'resultEx) - : Async<'resultEx> = + (context : SyncContext<'event, 'state>) + (decide : ISyncContext<'state> -> Async<'result * 'event list>) + (mapResult : 'result -> SyncContext<'event, 'state> -> 'view) + : Async<'view> = if maxSyncAttempts < 1 then raise <| System.ArgumentOutOfRangeException("maxSyncAttempts", maxSyncAttempts, "should be >= 1") /// Run a decision cycle - decide what events should be appended given the presented state - let rec loop attempt : Async<'resultEx> = async { + let rec loop attempt : Async<'view> = async { let log = if attempt = 1 then log else log.ForContext("syncAttempt", attempt) - let! result, events = decide (syncState :> ISyncContext<'state>).State + let! result, events = decide (context :> ISyncContext<'state>) if List.isEmpty events then log.Debug "No events generated" - return mapResult result syncState + return mapResult result context elif attempt = maxSyncAttempts then // Special case: on final attempt, we won't be `resync`ing; we're giving up - let! committed = syncState.TryWithoutResync(log, events) + let! committed = context.TryWithoutResync(log, events) if not committed then log.Debug "Max Sync Attempts exceeded" return raise (createMaxAttemptsExhaustedException attempt) else - return mapResult result syncState + return mapResult result context else - let! committed = syncState.TryOrResync(resyncRetryPolicy, attempt, log, events) + let! committed = context.TryOrResync(resyncRetryPolicy, attempt, log, events) if not committed then log.Debug "Resyncing and retrying" return! loop (attempt + 1) else - return mapResult result syncState } + return mapResult result context } /// Commence, processing based on the incoming state loop 1 let transact (maxAttempts, resyncRetryPolicy, createMaxAttemptsExhaustedException) (stream : IStream<_, _>, log) decide mapResult : Async<'result> = async { let! streamState = stream.Load log - let syncState = SyncState(streamState, stream.TrySync) + let syncState = SyncContext(streamState, stream.TrySync) return! run log (maxAttempts, resyncRetryPolicy, createMaxAttemptsExhaustedException) syncState decide mapResult } - let query (stream : IStream<'event, 'state>, log : ILogger, project: SyncState<'event, 'state> -> 'result) : Async<'result> = async { + let query (stream : IStream<'event, 'state>, log : ILogger, project: SyncContext<'event, 'state> -> 'result) : Async<'result> = async { let! streamState = stream.Load log - let syncState = SyncState(streamState, stream.TrySync) + let syncState = SyncContext(streamState, stream.TrySync) return project syncState }