Skip to content

Commit

Permalink
Add Stream.TransactEx
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Feb 19, 2021
1 parent 2093cb3 commit db37a20
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 10 deletions.
24 changes: 16 additions & 8 deletions src/Equinox/Decider.fs
Original file line number Diff line number Diff line change
Expand Up @@ -10,42 +10,50 @@ type MaxResyncsExhaustedException(count) =
/// Central Application-facing API. Wraps the handling of decision or query flows in a manner that is store agnostic
type Decider<'event, 'state>
( log, stream : IStream<'event, 'state>, maxAttempts : int,
[<Optional; DefaultParameterValue(null)>] ?createAttemptsExhaustedException,
[<Optional; DefaultParameterValue(null)>] ?createAttemptsExhaustedException : int -> exn,
[<Optional; DefaultParameterValue(null)>] ?resyncPolicy) =

let transact decide mapResult =
let resyncPolicy = defaultArg resyncPolicy (fun _log _attemptNumber resyncF -> async { return! resyncF })
let throwMaxResyncsExhaustedException attempts = MaxResyncsExhaustedException attempts
let handleResyncsExceeded = defaultArg createAttemptsExhaustedException throwMaxResyncsExhaustedException
Flow.transact (maxAttempts, resyncPolicy, handleResyncsExceeded) (stream, log) decide mapResult
let inline createDefaultAttemptsExhaustedException attempts : exn = MaxResyncsExhaustedException attempts :> exn
let createAttemptsExhaustedException = defaultArg createAttemptsExhaustedException createDefaultAttemptsExhaustedException
Flow.transact (maxAttempts, resyncPolicy, createAttemptsExhaustedException) (stream, log) decide mapResult

/// 0. Invoke the supplied <c>interpret</c> function with the present state
/// 1a. (if events yielded) Attempt to sync the yielded events events to the stream
/// 1b. Tries up to <c>maxAttempts</c> times in the case of a conflict, throwing <c>MaxResyncsExhaustedException</c> to signal failure.
member __.Transact(interpret : 'state -> 'event list) : Async<unit> =
transact (fun state -> async { return (), interpret state }) (fun () _context -> ())
transact (fun context -> async { return (), interpret context.State }) (fun () _context -> ())

/// 0. Invoke the supplied <c>decide</c> function with the present state, holding the <c>'result</c>
/// 1a. (if events yielded) Attempt to sync the yielded events events to the stream
/// 1b. Tries up to <c>maxAttempts</c> times in the case of a conflict, throwing <c>MaxResyncsExhaustedException</c> to signal failure.
/// 2. Yield result
member __.Transact(decide : 'state -> 'result * 'event list) : Async<'result> =
transact (fun state -> async { return decide state }) (fun result _context -> result)
transact (fun context -> async { return decide context.State }) (fun result _context -> result)

/// 0. Invoke the supplied <c>_Async_</c> <c>decide</c> function with the present state, holding the <c>'result</c>
/// 1a. (if events yielded) Attempt to sync the yielded events events to the stream
/// 1b. Tries up to <c>maxAttempts</c> times in the case of a conflict, throwing <c>MaxResyncsExhaustedException</c> to signal failure.
/// 2. Yield result
member __.TransactAsync(decide : 'state -> Async<'result * 'event list>) : Async<'result> =
transact decide (fun result _context -> result)
transact (fun context -> async { return! decide context.State }) (fun result _context -> result)

/// 0. Invoke the supplied <c>_Async_</c> <c>decide</c> function with the present state (including extended context), holding the <c>'result</c>
/// 1a. (if events yielded) Attempt to sync the yielded events events to the stream
/// 1b. Tries up to <c>maxAttempts</c> times in the case of a conflict, throwing <c>MaxResyncsExhaustedException</c> to signal failure.
/// 2. Uses <c>mapResult</c> to render the final outcome from the <c>'result</c> and/or the final <c>ISyncContext</c>
/// 3. Yields the outcome
member __.TransactEx(decide : ISyncContext<'state> -> Async<'result * 'event list>, mapResult : 'result -> ISyncContext<'state> -> 'resultEx) : Async<'resultEx> =
transact decide mapResult

/// 0. Invoke the supplied <c>_Async_</c> <c>decide</c> function with the present state, holding the <c>'result</c>
/// 1a. (if events yielded) Attempt to sync the yielded events events to the stream
/// 1b. Tries up to <c>maxAttempts</c> times in the case of a conflict, throwing <c>MaxResyncsExhaustedException</c> to signal failure.
/// 2. Uses <c>mapResult</c> to render the final outcome from the <c>'result</c> and/or the final <c>ISyncContext</c>
/// 3. Yields the outcome
member __.TransactAsyncEx(decide : 'state -> Async<'result * 'event list>, mapResult : 'result -> ISyncContext<'state> -> 'resultEx) : Async<'resultEx> =
transact decide mapResult
transact (fun context -> async { return! decide context.State }) mapResult

/// Project from the folded <c>'state</c>, without executing a decision flow as <c>Transact</c> does
member __.Query(projection : 'state -> 'view) : Async<'view> =
Expand Down
4 changes: 2 additions & 2 deletions src/Equinox/Flow.fs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ module internal Flow =
/// 2b. if conflicting changes, retry by recommencing at step 1 with the updated state
let run (log : ILogger) (maxSyncAttempts : int, resyncRetryPolicy, createMaxAttemptsExhaustedException)
(syncState : SyncState<'event, 'state>)
(decide : 'state -> Async<'result * 'event list>)
(decide : ISyncContext<'state> -> Async<'result * 'event list>)
(mapResult : 'result -> SyncState<'event, 'state> -> 'resultEx)
: Async<'resultEx> =

Expand All @@ -87,7 +87,7 @@ module internal Flow =
/// Run a decision cycle - decide what events should be appended given the presented state
let rec loop attempt : Async<'resultEx> = async {
let log = if attempt = 1 then log else log.ForContext("syncAttempt", attempt)
let! result, events = decide (syncState :> ISyncContext<'state>).State
let! result, events = decide (syncState :> ISyncContext<'state>)
if List.isEmpty events then
log.Debug "No events generated"
return mapResult result syncState
Expand Down

0 comments on commit db37a20

Please sign in to comment.