Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename Equinox.Stream -> Decider #272

Merged
merged 2 commits into from
Feb 19, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 53 additions & 53 deletions DOCUMENTATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -359,20 +359,20 @@ let decideY ... (state : Fold.State) : Decision * Events list = ...
`module Fold` to use `initial` and `fold`)

```fsharp
type Service internal (resolve : Id -> Equinox.Stream<Events.Event, Fold.State) = ...`
type Service internal (resolve : Id -> Equinox.Decider<Events.Event, Fold.State) = ...`

member __.Execute(id, command) : Async<unit> =
let stream = resolve id
stream.Transact(interpretX command)
let decider = resolve id
decider.Transact(interpretX command)

member __.Decide(id, inputs) : Async<Decision> =
let stream = resolve id
stream.Transact(decideX inputs)
let decider = resolve id
decider.Transact(decideX inputs)

let create resolve =
let resolve id =
let stream = resolve (streamName id)
Equinox.Stream(Serilog.Log.ForContext<Service>(), stream, maxAttempts = 3)
Equinox.Decider(Serilog.Log.ForContext<Service>(), stream, maxAttempts = 3)
Service(resolve)
```

Expand Down Expand Up @@ -593,13 +593,13 @@ let toSnapshot state = [Event.Snapshotted (Array.ofList state)]
* Equinox Stream typically the service should be a stateless Singleton
*)

type Service internal (resolve : ClientId -> Equinox.Stream<Events.Event, Fold.State>) =
type Service internal (resolve : ClientId -> Equinox.Decider<Events.Event, Fold.State>) =
let execute clientId command : Async<unit> =
let stream = resolve clientId
stream.Transact(interpret command)
let decider = resolve clientId
decider.Transact(interpret command)
let read clientId : Async<string list> =
let stream = resolve clientId
stream.Query id
let decider = resolve clientId
decider.Query id

member __.Execute(clientId, command) =
execute clientId command
Expand All @@ -612,7 +612,7 @@ type Service internal (resolve : ClientId -> Equinox.Stream<Events.Event, Fold.S

let create resolve : Service =
let resolve id =
Equinox.Stream(Serilog.Log.ForContext<Service>(), resolve (streamName id), maxAttempts = 3)
Equinox.Decider(Serilog.Log.ForContext<Service>(), resolve (streamName id), maxAttempts = 3)
Service(resolve)
```

Expand Down Expand Up @@ -696,9 +696,9 @@ elements you'll touch in a normal application are:
internal implementation of Optimistic Concurrency Control / retry loop used
by `Stream`. It's recommended to at least scan this file as it defines the
bartelink marked this conversation as resolved.
Show resolved Hide resolved
Transaction semantics everything is coming together in service of.
- [`type Stream`](https://github.com/jet/equinox/blob/master/src/Equinox/Equinox.fs#L11) -
- [`type Decider`](https://github.com/jet/equinox/blob/master/src/Equinox/Decider.fs#L11) -
surface API one uses to `Transact` or `Query` against a specific stream
- [`type Target` Discriminated Union](https://github.com/jet/equinox/blob/master/src/Equinox/Equinox.fs#L42) -
- [`type Target` Discriminated Union](https://github.com/jet/equinox/blob/master/src/Equinox/Decider.fs#L42) -
used to identify the Stream pertaining to the relevant Aggregate that
`resolve` will use to hydrate a `Stream`
bartelink marked this conversation as resolved.
Show resolved Hide resolved

Expand All @@ -710,7 +710,7 @@ follow!
#### Stream Members

```fsharp
type Equinox.Stream(stream : IStream<'event, 'state>, log, maxAttempts) =
type Equinox.Decider(stream : IStream<'event, 'state>, log, maxAttempts) =
StoreIntegration
// Run interpret function with present state, retrying with Optimistic Concurrency
member __.Transact(interpret : State -> Event list) : Async<unit>
Expand Down Expand Up @@ -832,27 +832,27 @@ context
let [<Literal>] Category = Favorites
let streamName (clientId : String) = FsCodec.StreamName.create Category clientId

type Service internal (resolve : string -> Equinox.Stream<Events.Event, Fold.State>) =
type Service internal (resolve : string -> Equinox.Decider<Events.Event, Fold.State>) =

let execute clientId command : Async<unit> =
let stream = resolve clientId
stream.Transact(interpret command)
let decider = resolve clientId
decider.Transact(interpret command)

let read clientId : Async<string list> =
let stream = resolve clientId
inner.Query id
let decider = resolve clientId
decider.Query id

let create resolve =
let resolve clientId =
let streamName = streamName clientId
Equinox.Stream(log, resolve streamName, maxAttempts = 3)
Equinox.Decider(log, resolve streamName, maxAttempts = 3)
Service(resolve)
```

The `Stream`-related functions in a given Aggregate establish the access
The `Decider`-related functions in a given Aggregate establish the access
patterns used across when Service methods access streams (see below). Typically
these are relatively straightforward calls forwarding to a `Equinox.Stream`
equivalent (see [`src/Equinox/Equinox.fs`](src/Equinox/Equinox.fs)), which in
these are relatively straightforward calls forwarding to a `Equinox.Decider`
equivalent (see [`src/Equinox/Decider.fs`](src/Equinox/Decider.fs)), which in
turn use the Optimistic Concurrency retry-loop in
[`src/Equinox/Flow.fs`](src/Equinox/Flow.fs).

Expand Down Expand Up @@ -1037,20 +1037,20 @@ let interpret c (state : State) =
#### `Service`

```fsharp
type Service internal (resolve : ClientId -> Equinox.Stream<Events.Event, Fold.State>) =
type Service internal (resolve : ClientId -> Equinox.Decider<Events.Event, Fold.State>) =

let execute clientId command : Async<unit> =
let stream = resolve clientId
stream.Transact(interpret command)
let decider = resolve clientId
decider.Transact(interpret command)
let handle clientId command : Async<Todo list> =
let stream = resolve clientId
stream.Transact(fun state ->
let decider = resolve clientId
decider.Transact(fun state ->
let events = interpret command state
let state' = fold state events
state'.items,events)
let query clientId (projection : State -> T) : Async<T> =
let stream = resolve clientId
stream.Query projection
let decider = resolve clientId
decider.Query projection

member __.List clientId : Async<Todo seq> =
query clientId (fun s -> s.items |> Seq.ofList)
Expand Down Expand Up @@ -1098,7 +1098,7 @@ type Service internal (resolve : ClientId -> Equinox.Stream<Events.Event, Fold.S
<a name="queries"></a>
# Queries

Queries are handled by `Equinox.Stream`s' `Query` function.
Queries are handled by `Equinox.Decider`s' `Query` function.

A _query_ projects a value from the `'state` of an Aggregate. Queries should be
used sparingly, as loading and folding the events each time is against the
Expand All @@ -1109,19 +1109,19 @@ lead to the leaking of decision logic outside of the Aggregate's `module`.
```fsharp
// Query function exposing part of the state
member __.ReadAddress(clientId) =
let stream = resolve clientId
stream.Query(fun state -> state.address)
let decider = resolve clientId
decider.Query(fun state -> state.address)

// Return the entire state we hold for this aggregate (NOTE: generally not a good idea)
member __.Read(clientId) =
let stream = resolve clientId
stream.Query id
let decider = resolve clientId
decider.Query id
```

<a name="commands"></a>
# Command+Decision Handling functions

Commands or Decisions are handled via `Equinox.Stream`'s `Transact` method
Commands or Decisions are handled via `Equinox.Decider`'s `Transact` method

## Commands (`interpret` signature)

Expand Down Expand Up @@ -1160,18 +1160,18 @@ let interpret (context, command) state : Events.Event list =
| Some eventDetails -> // accepted, mapped to event details record
[Event.HandledCommand eventDetails]

type Service internal (resolve : ClientId -> Equinox.Stream<Events.Event, Fold.State>)
type Service internal (resolve : ClientId -> Equinox.Decider<Events.Event, Fold.State>)

// Given the supplied context, apply the command for the specified clientId
member __.Execute(clientId, context, command) : Async<unit> =
let stream = resolve clientId
stream.Transact(fun state -> interpretCommand (context, command) state)
let decider = resolve clientId
decider.Transact(fun state -> interpretCommand (context, command) state)

// Given the supplied context, apply the command for the specified clientId
// Throws if this client's data is marked Read Only
member __.Execute(clientId, context, command) : Async<unit> =
let stream = resolve clientId
stream.Transact(fun state ->
let decider = resolve clientId
decider.Transact(fun state ->
if state.isReadOnly then raise AccessDeniedException() // Mapped to 403 externally
interpretCommand (context, command) state)
```
Expand All @@ -1189,7 +1189,7 @@ In this case, the signature is: `let decide (context, command, args) state :
'result * Events.Event list`

Note that the return value is a _tuple_ of `('result,Event list):
- the `fst` element is returned from `stream.Transact`
- the `fst` element is returned from `decider.Transact`
- the `snd` element of the tuple represents the events (if any) that should
represent the state change implied by the request.with

Expand All @@ -1203,15 +1203,15 @@ let decide (context, command) state : int * Events.Event list =
// ... if `snd` contains event, they are written
// `fst` (an `int` in this instance) is returned as the outcome to the caller

type Service internal (resolve : ClientId -> Equinox.Stream<Events.Event, Fold.State>) =
type Service internal (resolve : ClientId -> Equinox.Decider<Events.Event, Fold.State>) =

// Given the supplied context, attempt to apply the command for the specified clientId
// NOTE Try will return the `fst` of the tuple that `decide` returned
// If >1 attempt was necessary (e.g., due to conflicting events), the `fst`
// from the last attempt is the outcome
member __.Try(clientId, context, command) : Async<int> =
let stream = resolve clientId
stream.Transact(fun state ->
let decider = resolve clientId
decider.Transact(fun state ->
decide (context, command) state)
```

Expand Down Expand Up @@ -1317,11 +1317,11 @@ let interpretMany fold interpreters (state : 'state) : 'state * 'event list =
let state' = fold state events
state', acc @ events)

type Service internal (resolve : CartId -> Equinox.Stream<Events.Event, Fold.State>) =
type Service internal (resolve : CartId -> Equinox.Decider<Events.Event, Fold.State>) =

member __.Run(cartId, optimistic, commands : Command seq, ?prepare) : Async<Fold.State> =
let stream = resolve (cartId,if optimistic then Some Equinox.AllowStale else None)
stream.TransactAsync(fun state -> async {
let decider = resolve (cartId,if optimistic then Some Equinox.AllowStale else None)
decider.TransactAsync(fun state -> async {
match prepare with None -> () | Some prep -> do! prep
return interpretMany Fold.fold (Seq.map interpret commands) state })
```
Expand Down Expand Up @@ -1381,8 +1381,8 @@ type Accumulator<'event, 'state>(fold : 'state -> 'event seq -> 'state, originSt

type Service ... =
member __.Run(cartId, optimistic, commands : Command seq, ?prepare) : Async<Fold.State> =
let stream = resolve (cartId,if optimistic then Some Equinox.AllowStale else None)
stream.TransactAsync(fun state -> async {
let decider = resolve (cartId,if optimistic then Some Equinox.AllowStale else None)
decider.TransactAsync(fun state -> async {
match prepare with None -> () | Some prep -> do! prep
let acc = Accumulator(Fold.fold, state)
for cmd in commands do
Expand Down Expand Up @@ -2236,8 +2236,8 @@ rich relative to the need of consumers to date. Some things remain though:
- Provide a low level walking events in F# API akin to
`Equinox.CosmosStore.Core.Events`; this would allow consumers to jump from direct
use of `EventStore.ClientAPI` -> `Equinox.EventStore.Core.Events` ->
`Equinox.Stream` (with the potential to swap stores once one gets to using
`Equinox.Stream`)
`Equinox.Decider` (with the potential to swap stores once one gets to using
`Equinox.Decider`)
- Get conflict handling as efficient and predictable as for `Equinox.CosmosStore`
https://github.com/jet/equinox/issues/28
- provide for snapshots to be stored out of the stream, and loaded in a
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -752,7 +752,7 @@ Ouch, not looking forward to reading all that logic :frown: ? [Have a read, it's
<a name="how-is-expectedVersion-managed"/></a>
### Help me understand how the `expectedVersion` is used with EventStoreDB - it seems very confusing :pray: [@dharmaturtle](https://github.com/dharmaturtle)

> I'm having some trouble understanding how Equinox+ESDB handles "expected version". Most of the examples use `Equinox.Stream.Transact` which is storage agnostic and doesn't offer any obvious concurrency checking. In `Equinox.EventStore.Context`, there's a `Sync` and `TrySync` that take a `Token` which holds a `streamVersion`. Should I be be using that instead of `Transact`?
> I'm having some trouble understanding how Equinox+ESDB handles "expected version". Most of the examples use `Equinox.Decider.Transact` which is storage agnostic and doesn't offer any obvious concurrency checking. In `Equinox.EventStore.Context`, there's a `Sync` and `TrySync` that take a `Token` which holds a `streamVersion`. Should I be be using that instead of `Transact`?

The bulk of the implementation is in [`Equinox/Flow.fs`](https://github.com/jet/equinox/blob/master/src/Equinox/Flow.fs)

Expand Down
10 changes: 5 additions & 5 deletions diagrams/CosmosCode.puml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
title Code diagram for Equinox.CosmosStore Query operation, with empty cache and nothing written to the stream yet

actor Caller order 20
box "Equinox.Stream"
box "Equinox.Decider"
participant Stream order 40
end box
participant Aggregate order 50
Expand Down Expand Up @@ -33,7 +33,7 @@ Stream -> Caller: {result = list }
title Code diagram for Equinox.CosmosStore Transact operation, with cache up to date using Snapshotting Access Strategy

actor Caller order 20
box "Equinox.Stream"
box "Equinox.Decider"
participant Stream order 40
end box
participant Aggregate order 50
Expand Down Expand Up @@ -118,8 +118,8 @@ Stream -> Caller: proposedResult
title Code diagram for Equinox.CosmosStore Query operation immediately following a Query/Transact on the same node, i.e. cached

actor Caller order 20
box "Equinox.Stream"
participant Stream order 40
box "Equinox.Decider"
participant Decider order 40
end box
participant Aggregate order 50
participant Service order 60
Expand Down Expand Up @@ -147,7 +147,7 @@ Aggregate -> Caller: result
title Code diagram for Equinox.CosmosStore Query operation on a node without an in-sync cached value (with snapshotting Access Strategy)

actor Caller order 20
box "Equinox.Stream"
box "Equinox.Decider"
participant Stream order 40
end box
participant Aggregate order 50
Expand Down
2 changes: 1 addition & 1 deletion diagrams/CosmosComponent.puml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ actor "Applications" <<External Person>> as apps
rectangle "Consistent Processing" <<Container>> {
frame Domain {
rectangle "Equinox" as eqx <<Component>> {
rectangle "Equinox.Stream" as stream <<Component>>
rectangle "Equinox.Decider" as stream <<Component>>
interface "IStream" <<Component>>
}
rectangle aggregate <<Component>> [
Expand Down
2 changes: 1 addition & 1 deletion diagrams/CosmosContainer.puml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ rectangle "Consistent Processing" <<Expanded>> {
Service
]
rectangle stream <<Component>> [
Equinox.Stream
Equinox.Decider
]
}

Expand Down
2 changes: 1 addition & 1 deletion diagrams/EventStore.puml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ rectangle "Application Consistent Processing" <<External Container>> {
Service
]
rectangle stream <<Component>> [
Equinox.Stream
Equinox.Decider
]
interface IStream <<Component>>
}
Expand Down
8 changes: 4 additions & 4 deletions diagrams/EventStoreCode.puml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
title Code diagram for Equinox.EventStore / Equinox.SqlStreamStore Query operation\nwith a cold cache and an empty stream

actor Caller order 20
box "Equinox.Stream"
box "Equinox.Decider"
participant Stream order 40
end box
participant Aggregate order 50
Expand Down Expand Up @@ -34,7 +34,7 @@ Stream -> Caller: {result = list }
title Code diagram for Equinox.EventStore / Equinox.SqlStreamStore Transact operation\nwith an-in sync cache and snapshotting Access Strategy

actor Caller order 20
box "Equinox.Stream"
box "Equinox.Decider"
participant Stream order 40
end box
participant Aggregate order 50
Expand Down Expand Up @@ -101,7 +101,7 @@ Stream -> Caller: proposedResult
title Code diagram for Equinox.EventStore / Equinox.SqlStreamStore Query\nfollowing a Query or Transact on same node, i.e. in sync cache, events on stream

actor Caller order 20
box "Equinox.Stream"
box "Equinox.Decider"
participant Stream order 40
end box
participant Aggregate order 50
Expand Down Expand Up @@ -131,7 +131,7 @@ Aggregate -> Caller: result
title Code diagram for Equinox.EventStore / Equinox.SqlStreamStore Query\nwith out of sync cache e.g. another process (when using a snapshotting Access Strategy)

actor Caller order 20
box "Equinox.Stream"
box "Equinox.Decider"
participant Stream order 40
end box
participant Aggregate order 50
Expand Down
2 changes: 1 addition & 1 deletion diagrams/MemoryStore.puml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ together {
Service
]
rectangle stream <<Component>> [
Equinox.Stream
Equinox.Decider
]
interface IStream <<Component>>
}
Expand Down
Loading