From 756c2b606012e427a2e20a4c7bdf314f2be501a9 Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Sun, 15 Nov 2020 23:37:54 +0000 Subject: [PATCH] Add Stream.TransactEx --- src/Equinox/Equinox.fs | 24 ++++++++++++++++-------- src/Equinox/Flow.fs | 4 ++-- 2 files changed, 18 insertions(+), 10 deletions(-) diff --git a/src/Equinox/Equinox.fs b/src/Equinox/Equinox.fs index 400e2a1a2..90ad4c8da 100755 --- a/src/Equinox/Equinox.fs +++ b/src/Equinox/Equinox.fs @@ -10,34 +10,42 @@ type MaxResyncsExhaustedException(count) = /// Central Application-facing API. Wraps the handling of decision or query flows in a manner that is store agnostic type Stream<'event, 'state> ( log, stream : IStream<'event, 'state>, maxAttempts : int, - [] ?createAttemptsExhaustedException, + [] ?createAttemptsExhaustedException : int -> exn, [] ?resyncPolicy) = let transact decide mapResult = let resyncPolicy = defaultArg resyncPolicy (fun _log _attemptNumber resyncF -> async { return! resyncF }) - let throwMaxResyncsExhaustedException attempts = MaxResyncsExhaustedException attempts - let handleResyncsExceeded = defaultArg createAttemptsExhaustedException throwMaxResyncsExhaustedException - Flow.transact (maxAttempts, resyncPolicy, handleResyncsExceeded) (stream, log) decide mapResult + let inline createDefaultAttemptsExhaustedException attempts : exn = MaxResyncsExhaustedException attempts :> exn + let createAttemptsExhaustedException = defaultArg createAttemptsExhaustedException createDefaultAttemptsExhaustedException + Flow.transact (maxAttempts, resyncPolicy, createAttemptsExhaustedException) (stream, log) decide mapResult /// 0. Invoke the supplied interpret function with the present state /// 1a. (if events yielded) Attempt to sync the yielded events events to the stream /// 1b. Tries up to maxAttempts times in the case of a conflict, throwing MaxResyncsExhaustedException to signal failure. member __.Transact(interpret : 'state -> 'event list) : Async = - transact (fun state -> async { return (), interpret state }) (fun () _context -> ()) + transact (fun context -> async { return (), interpret context.State }) (fun () _context -> ()) /// 0. Invoke the supplied decide function with the present state, holding the 'result /// 1a. (if events yielded) Attempt to sync the yielded events events to the stream /// 1b. Tries up to maxAttempts times in the case of a conflict, throwing MaxResyncsExhaustedException to signal failure. /// 2. Yield result member __.Transact(decide : 'state -> 'result * 'event list) : Async<'result> = - transact (fun state -> async { return decide state }) (fun result _context -> result) + transact (fun context -> async { return decide context.State }) (fun result _context -> result) /// 0. Invoke the supplied _Async_ decide function with the present state, holding the 'result /// 1a. (if events yielded) Attempt to sync the yielded events events to the stream /// 1b. Tries up to maxAttempts times in the case of a conflict, throwing MaxResyncsExhaustedException to signal failure. /// 2. Yield result member __.TransactAsync(decide : 'state -> Async<'result * 'event list>) : Async<'result> = - transact decide (fun result _context -> result) + transact (fun context -> async { return! decide context.State }) (fun result _context -> result) + + /// 0. Invoke the supplied _Async_ decide function with the present state (including extended context), holding the 'result + /// 1a. (if events yielded) Attempt to sync the yielded events events to the stream + /// 1b. Tries up to maxAttempts times in the case of a conflict, throwing MaxResyncsExhaustedException to signal failure. + /// 2. Uses mapResult to render the final outcome from the 'result and/or the final ISyncContext + /// 3. Yields the outcome + member __.TransactEx(decide : ISyncContext<'state> -> Async<'result * 'event list>, mapResult : 'result -> ISyncContext<'state> -> 'resultEx) : Async<'resultEx> = + transact decide mapResult /// 0. Invoke the supplied _Async_ decide function with the present state, holding the 'result /// 1a. (if events yielded) Attempt to sync the yielded events events to the stream @@ -45,7 +53,7 @@ type Stream<'event, 'state> /// 2. Uses mapResult to render the final outcome from the 'result and/or the final ISyncContext /// 3. Yields the outcome member __.TransactAsyncEx(decide : 'state -> Async<'result * 'event list>, mapResult : 'result -> ISyncContext<'state> -> 'resultEx) : Async<'resultEx> = - transact decide mapResult + transact (fun context -> async { return! decide context.State }) mapResult /// Project from the folded 'state, without executing a decision flow as Transact does member __.Query(projection : 'state -> 'view) : Async<'view> = diff --git a/src/Equinox/Flow.fs b/src/Equinox/Flow.fs index 93bc1af0c..29964c826 100755 --- a/src/Equinox/Flow.fs +++ b/src/Equinox/Flow.fs @@ -78,7 +78,7 @@ module internal Flow = /// 2b. if conflicting changes, retry by recommencing at step 1 with the updated state let run (log : ILogger) (maxSyncAttempts : int, resyncRetryPolicy, createMaxAttemptsExhaustedException) (syncState : SyncState<'event, 'state>) - (decide : 'state -> Async<'result * 'event list>) + (decide : ISyncContext<'state> -> Async<'result * 'event list>) (mapResult : 'result -> SyncState<'event, 'state> -> 'resultEx) : Async<'resultEx> = @@ -87,7 +87,7 @@ module internal Flow = /// Run a decision cycle - decide what events should be appended given the presented state let rec loop attempt : Async<'resultEx> = async { let log = if attempt = 1 then log else log.ForContext("syncAttempt", attempt) - let! result, events = decide (syncState :> ISyncContext<'state>).State + let! result, events = decide (syncState :> ISyncContext<'state>) if List.isEmpty events then log.Debug "No events generated" return mapResult result syncState