From e603664041dc66637d16eda9ad5731a210526650 Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Mon, 11 Sep 2023 11:04:36 +0100 Subject: [PATCH] Release 3.0.0-rc.14 Target FsCodec 3.0.0-rc.13 Tweaks --- DOCUMENTATION.md | 117 ++++++++++++------ samples/Infrastructure/Services.fs | 2 +- samples/Infrastructure/Store.fs | 2 +- samples/Store/Domain/Domain.fsproj | 4 +- samples/Tutorial/Tutorial.fsproj | 2 +- samples/Tutorial/Upload.fs | 2 +- src/Equinox.CosmosStore/CosmosStore.fs | 4 +- src/Equinox.DynamoStore/DynamoStore.fs | 67 +++++----- src/Equinox/Equinox.fsproj | 2 +- .../AccessStrategies.fs | 2 +- .../DocumentStoreIntegration.fs | 4 +- tools/Equinox.Tool/Program.fs | 7 +- 12 files changed, 127 insertions(+), 88 deletions(-) diff --git a/DOCUMENTATION.md b/DOCUMENTATION.md index a5adceb26..4b4149397 100755 --- a/DOCUMENTATION.md +++ b/DOCUMENTATION.md @@ -315,19 +315,22 @@ highly recommended to use the following canonical skeleton layout: ```fsharp module Aggregate -module Stream = +module private Stream = let [] Category = "category" let id = FsCodec.StreamId.gen Id.toString -(* Optionally, Helpers/Types *) +(* Optionally (rarely) Helpers/Types *) // NOTE - these types and the union case names reflect the actual storage // formats and hence need to be versioned with care [] module Events = + type Snapshotted = ... // NOTE: event body types past tense with same name as case + type Event = | ... + | [] Snapshotted of Snapshotted // NOTE: Snapshotted event explictly named to remind one can/should version it // optionally: `encode`, `tryDecode` (only if you're doing manual decoding) let codec = FsCodec ... Codec.Create(...) ``` @@ -339,6 +342,44 @@ Some notes about the intents being satisfied here: sibling code in adjacent `module`s should not be using them directly (in general interaction should be via the `type Service`) +✅ DO keep `module Stream` visibility `private`, present via `module Reactions` + +If the composition of stream names is relevant for Reactions processing, expose relevant helpers in a `module Reactions` facade. +For instance, rather than having external reaction logic refer to `Aggregate.Stream.Category`, expose a facade such as: + +```fsharp +module Reactions = + + let streamName = Stream.name + let deletionNamePrefix tenantIdStr = $"%s{Stream.Category}-%s{tenantIdStr}" +``` + +✅ DO use tupled arguments for the `Stream.id` function + +All the inputs of which the `StreamId` is composed should be represented as one argument: + +```fsharp +✅ let id struct (tenantId, clientId) = FsCodec.StreamId.gen2 TenantId.toString ClientId.toString (tenantId, clientId) +✅ let id = FsCodec.StreamId.gen2 TenantId.toString ClientId.toString +❌ let id tenantId clientId = FsCodec.StreamId.gen2 TenantId.toString ClientId.toString (tenantId, clientId) +``` + +✅ DO keep `module Stream` visibility `private`, present via `module Reactions` + +If the composition of stream names is relevant for Reactions processing, expose relevant helpers in a `module Reactions` facade. +For instance, rather than having external reaction logic refer to `Aggregate.Stream.Category`, expose a facade such as: + +```fsharp +module Reactions = + + let streamName = Stream.name + let deletionNamePrefix tenantIdStr = $"%s{Stream.Category}-%s{tenantIdStr}" + let [] (|For|_|) = Stream.tryDecode + let [] (|Decode|_|) = function + | struct (For id, _) & Streams.Decode dec events -> ValueSome struct (id, events) + | _ -> ValueNone +``` + ```fsharp module Fold = @@ -411,17 +452,18 @@ either within the `module Aggregate`, or somewhere outside closer to the ```fsharp let defaultCacheDuration = System.TimeSpan.FromMinutes 20. -let cacheStrategy = Equinox.CosmosStore.CachingStrategy.SlidingWindow (cache, defaultCacheDuration) +let cacheStrategy cache = Equinox.CosmosStore.CachingStrategy.SlidingWindow (cache, defaultCacheDuration) module EventStore = - let accessStrategy = Equinox.EventStoreDb.AccessStrategy.RollingSnapshots (Fold.isOrigin, Fold.snapshot) + let accessStrategy = Equinox.EventStoreDb.AccessStrategy.RollingSnapshots Fold.Snapshot.config let category (context, cache) = - Equinox.EventStore.EventStoreCategory(context, Stream.Category, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy) + Equinox.EventStore.EventStoreCategory(context, Stream.Category, Events.codec, Fold.fold, Fold.initial, accessStrategy, cacheStrategy cache) module Cosmos = let accessStrategy = Equinox.CosmosStore.AccessStrategy.Snapshot Fold.Snapshot.config let category (context, cache) = - Equinox.CosmosStore.CosmosStoreCategory(context, Stream.Category, Events.codec, Fold.fold, Fold.initial, accessStrategy, cacheStrategy) + Equinox.CosmosStore.CosmosStoreCategory(context, Stream.Category, Events.codec, Fold.fold, Fold.initial, accessStrategy, cacheStrategy cache) +``` ### `MemoryStore` Storage Binding Module @@ -432,7 +474,7 @@ can use the `MemoryStore` in the context of your tests: ```fsharp module MemoryStore = let category (store: Equinox.MemoryStore.VolatileStore) = - Equinox.MemoryStore.MemoryStoreCategory(store, Category, Events.codec, Fold.fold, Fold.initial) + Equinox.MemoryStore.MemoryStoreCategory(store, Stream.Category, Events.codec, Fold.fold, Fold.initial) ``` Typically that binding module can live with your test helpers rather than @@ -479,7 +521,7 @@ events on a given category of stream: tests and used to parameterize the Category's storage configuration._. Sometimes named `apply`) -- `interpret: (context/command etc ->) 'state -> event' list` or `decide: (context/command etc ->) 'state -> 'result*'event list`: responsible for _Deciding_ (in an [idempotent](https://en.wikipedia.org/wiki/Idempotence) manner) how the intention represented by `context/command` should be mapped with regard to the provided `state` in terms of: +- `interpret: (context/command etc ->) 'state -> 'event[]` or `decide: (context/command etc ->) 'state -> 'result * 'event[]`: responsible for _Deciding_ (in an [idempotent](https://en.wikipedia.org/wiki/Idempotence) manner) how the intention represented by `context/command` should be mapped with regard to the provided `state` in terms of: a) the `'events` that should be written to the stream to record the decision b) (for the `'result` in the `decide` signature) any response to be returned to the invoker (NB returning a result likely represents a violation of the [CQS](https://en.wikipedia.org/wiki/Command%E2%80%93query_separation) and/or CQRS principles, [see Synchronous Query in the Glossary](#glossary)) @@ -574,10 +616,10 @@ type Command = | Remove of itemId: int let interpret command state = - let has id = state |> List.exits (is id) + let has id = state |> List.exists (is id) match command with - | Add item -> if has item.id then [] else [Added item] - | Remove id -> if has id then [Removed id] else [] + | Add item -> if has item.id then [||] else [| Added item |] + | Remove id -> if has id then [| Removed id |] else [||] (* * Optional: Snapshot/Unfold-related functions to allow establish state @@ -710,15 +752,15 @@ follow! ```fsharp type Equinox.Decider(...) = -StoreIntegration + // Run interpret function with present state, retrying with Optimistic Concurrency - member _.Transact(interpret: State -> Event list): Async + member _.Transact(interpret: 'state -> 'event[]): Async // Run decide function with present state, retrying with Optimistic Concurrency, yielding Result on exit - member _.Transact(decide: State -> Result*Event list): Async + member _.Transact(decide: 'state -> 'result * 'event[]): Async<'result> // Runs a Null Flow that simply yields a `projection` of `Context.State` - member _.Query(projection: State -> View): Async + member _.Query(projection: 'state -> 'view): Async<'view> ``` ### Favorites walkthrough @@ -789,8 +831,8 @@ type Command = | Remove of string let interpret command state = match command with - | Add sku -> if state |> List.contains sku then [] else [Added sku] - | Remove sku -> if state |> List.contains sku |> not then [] else [Removed sku] + | Add sku -> if state |> List.contains sku then [||] else [| Added sku |] + | Remove sku -> if state |> List.contains sku |> not then [||] else [| Removed sku |] ``` Command handling should almost invariably be implemented in an @@ -1006,13 +1048,13 @@ let fold = Array.fold evolve type Command = Add of Todo | Update of Todo | Delete of id: int | Clear let interpret c (state: State) = match c with - | Add value -> [Added { value with id = state.nextId }] + | Add value -> [| Added { value with id = state.nextId } |] | Update value -> match state.items |> List.tryFind (function { id = id } -> id = value.id) with - | Some current when current <> value -> [Updated value] - | _ -> [] - | Delete id -> if state.items |> List.exists (fun x -> x.id = id) then [Deleted id] else [] - | Clear -> if state.items |> List.isEmpty then [] else [Cleared] + | Some current when current <> value -> [| Updated value |] + | _ -> [||] + | Delete id -> if state.items |> List.exists (fun x -> x.id = id) then [| Deleted id |] else [||] + | Clear -> if state.items |> List.isEmpty then [||] else [| Cleared |] ``` - Note `Add` does not adhere to the normal idempotency constraint, being @@ -1130,7 +1172,7 @@ In this case, the Decision Process is `interpret`ing the _Command_ in the context of a `'state`. The function signature is: -`let interpret (context, command, args) state: Events.Event list` +`let interpret (context, command, args) state: Events.Event[]` Note the `'state` is the last parameter; it's computed and supplied by the Equinox Flow. @@ -1147,12 +1189,12 @@ conflicting write have taken place since the loading of the state_ ```fsharp -let interpret (context, command) state: Events.Event list = +let interpret (context, command) state: Events.Event[] = match tryCommand context command state with | None -> - [] // not relevant / already in effect + [||] // not relevant / already in effect | Some eventDetails -> // accepted, mapped to event details record - [Event.HandledCommand eventDetails] + [| Events.HandledCommand eventDetails |] type Service internal (resolve: ClientId -> Equinox.Decider) @@ -1180,12 +1222,12 @@ signature: you're both potentially emitting events and yielding an outcome or projecting some of the 'state'. In this case, the signature is: `let decide (context, command, args) state: -'result * Events.Event list` +'result * Events.Event[]` -Note that the return value is a _tuple_ of `('result,Event list): +Note that the return value is a _tuple_ of `('result, Events.Event[])`: - the `fst` element is returned from `decider.Transact` - the `snd` element of the tuple represents the events (if any) that should - represent the state change implied by the request.with + represent the state change implied by the request. Note if the decision function yields events, and a conflict is detected, the flow may result in the `decide` function being rerun with the conflicting state @@ -1193,7 +1235,7 @@ until either no events are emitted, or there were on further conflicting writes supplied by competing writers. ```fsharp -let decide (context, command) state: int * Events.Event list = +let decide (context, command) state: int * Events.Event[] = // ... if `snd` contains event, they are written // `fst` (an `int` in this instance) is returned as the outcome to the caller @@ -1259,7 +1301,7 @@ let validateInterpret contextAndOrArgsAndOrCommand state = let validateIdempotent contextAndOrArgsAndOrCommand state' = let events' = interpret contextAndOrArgsAndOrCommand state' match events' with - | [|] -> () + | [||] -> () // TODO add clauses to validate edge cases that should still generate events on a re-run | xs -> failwithf "Not idempotent; Generated %A in response to %A" xs contextAndOrArgsAndOrCommand ``` @@ -1317,7 +1359,7 @@ type Service internal (resolve: CartId -> Equinox.Decider = let decider = resolve cartId - let opt = if optimistic then Equinox.AnyCachedValue else Equinox.RequireLoad + let opt = if optimistic then Equinox.LoadOption.AnyCachedValue else Equinox.LoadOption.RequireLoad decider.Transact(fun state -> async { match prepare with None -> () | Some prep -> do! prep return interpretMany Fold.fold (Seq.map interpret commands) state }, opt) @@ -1379,7 +1421,7 @@ type Accumulator<'event, 'state>(fold: 'state -> 'event[] -> 'state, originState type Service ... = member _.Run(cartId, optimistic, commands: Command seq, ?prepare): Async = let decider = resolve cartId - let opt = if optimistic then Equinox.AnyCachedValue else Equinox.RequireLoad + let opt = if optimistic then Equinox.LoadOption.AnyCachedValue else Equinox.LoadOption.RequireLoad decider.Transact(fun state -> async { match prepare with None -> () | Some prep -> do! prep let acc = Accumulator(Fold.fold, state) @@ -1453,11 +1495,8 @@ Key aspects relevant to the Equinox programming model: - In general, EventStore provides excellent caching and performance characteristics intrinsically by virtue of its design -- Projections can be managed by either tailing streams (including the synthetic - `$all` stream) or using the Projections facility - there's no obvious reason - to wrap it, aside from being able to uniformly target CosmosDB (i.e. one - could build an `Equinox.EventStore.Projection` library and an `eqx project - stats es` with very little code). +- Projections can be managed by the `Propulsion.EventStoreDb` library; there is also + an `eqx project stats es` feature). - In general event streams should be considered append only, with no mutations or deletes @@ -2339,7 +2378,7 @@ For Domain Events in an event-sourced model, their permanence and immutability i It should be noted with regard to such requirements: - EventStoreDB does not present any APIs for mutation of events, though deleting events is a fully supported operation (although that can be restricted). Rewrites are typically approached by doing an offline database rebuild. -- `Equinox.Cosmos` and `Equinox.CosmosStore` include support for pruning events (only) from the head of a stream. Obviously, there's nothing stopping you deleting or altering the Batch documents out of band via the underlying CosmosDB APIs directly (Note however that the semantics of document ordering within a logical partition means its strongly advised not to mutate any event Batch documents as this will cause their ordering to become incorrect relative to other events, invalidating a key tenet that Change Feed Processors rely on). +- `Equinox.CosmosStore` includes support for pruning events (only) from the head of a stream. Obviously, there's nothing stopping you deleting or altering the Batch documents out of band via the underlying CosmosDB APIs directly (Note however that the semantics of document ordering within a logical partition means its strongly advised not to mutate any event Batch documents as this will cause their ordering to become incorrect relative to other events, invalidating a key tenet that Change Feed Processors rely on). ### Growth handling strategies diff --git a/samples/Infrastructure/Services.fs b/samples/Infrastructure/Services.fs index cc325bdd2..533d82394 100644 --- a/samples/Infrastructure/Services.fs +++ b/samples/Infrastructure/Services.fs @@ -21,7 +21,7 @@ type Store(store) = CosmosStore.CosmosStoreCategory<'event,'state,_>(store, name, codec.ToJsonElementCodec(), fold, initial, accessStrategy, caching) | Store.Config.Dynamo (store, caching, unfolds) -> let accessStrategy = if unfolds then DynamoStore.AccessStrategy.Snapshot snapshot else DynamoStore.AccessStrategy.Unoptimized - DynamoStore.DynamoStoreCategory<'event,'state,_>(store, name, FsCodec.Deflate.EncodeTryDeflate codec, fold, initial, accessStrategy, caching) + DynamoStore.DynamoStoreCategory<'event,'state,_>(store, name, FsCodec.Compression.EncodeTryCompress codec, fold, initial, accessStrategy, caching) | Store.Config.Es (context, caching, unfolds) -> let accessStrategy = if unfolds then EventStoreDb.AccessStrategy.RollingSnapshots snapshot else EventStoreDb.AccessStrategy.Unoptimized EventStoreDb.EventStoreCategory<'event,'state,_>(context, name, codec, fold, initial, accessStrategy, caching) diff --git a/samples/Infrastructure/Store.fs b/samples/Infrastructure/Store.fs index e2dc1006c..261786e45 100644 --- a/samples/Infrastructure/Store.fs +++ b/samples/Infrastructure/Store.fs @@ -211,7 +211,7 @@ module Dynamo = let config (log : ILogger) (cache, unfolds) (a : Arguments) = a.Connector.LogConfiguration(log) - let client = a.Connector.CreateDynamoDbClient() |> DynamoStoreClient + let client = a.Connector.CreateDynamoStoreClient() let context = DynamoStoreContext(client, a.Table, maxBytes = a.TipMaxBytes, queryMaxItems = a.QueryMaxItems, ?tipMaxEvents = a.TipMaxEvents, ?archiveTableName = a.ArchiveTable) context.LogConfiguration(log, "Main", a.Table, ?archiveTableName = a.ArchiveTable) diff --git a/samples/Store/Domain/Domain.fsproj b/samples/Store/Domain/Domain.fsproj index b750acd7f..9808cd2f6 100644 --- a/samples/Store/Domain/Domain.fsproj +++ b/samples/Store/Domain/Domain.fsproj @@ -17,8 +17,8 @@ - - + + diff --git a/samples/Tutorial/Tutorial.fsproj b/samples/Tutorial/Tutorial.fsproj index c6b32cd8f..da41e3178 100644 --- a/samples/Tutorial/Tutorial.fsproj +++ b/samples/Tutorial/Tutorial.fsproj @@ -28,7 +28,7 @@ - + diff --git a/samples/Tutorial/Upload.fs b/samples/Tutorial/Upload.fs index ea7cf101a..d099d856e 100644 --- a/samples/Tutorial/Upload.fs +++ b/samples/Tutorial/Upload.fs @@ -47,7 +47,7 @@ let decide (value : UploadId) (state : Fold.State) : Choice * | None -> Choice1Of2 value, [| Events.IdAssigned { value = value} |] | Some value -> Choice2Of2 value, [||] -type Service internal (resolve : CompanyId * PurchaseOrderId -> Equinox.Decider) = +type Service internal (resolve: struct (CompanyId * PurchaseOrderId) -> Equinox.Decider) = member _.Sync(companyId, purchaseOrderId, value) : Async> = let decider = resolve (companyId, purchaseOrderId) diff --git a/src/Equinox.CosmosStore/CosmosStore.fs b/src/Equinox.CosmosStore/CosmosStore.fs index 415d18e18..ae809a2df 100644 --- a/src/Equinox.CosmosStore/CosmosStore.fs +++ b/src/Equinox.CosmosStore/CosmosStore.fs @@ -1347,8 +1347,8 @@ type CosmosStoreCategory<'event, 'state, 'req> = match access with | AccessStrategy.Unoptimized -> (fun _ -> false), false, Choice1Of3 () | AccessStrategy.LatestKnownEvent -> (fun _ -> true), true, Choice2Of3 (fun events _ -> events |> Array.last |> Array.singleton) - | AccessStrategy.Snapshot (isOrigin, toSnapshot) -> isOrigin, true, Choice2Of3 (fun _ state -> toSnapshot state |> Array.singleton) - | AccessStrategy.MultiSnapshot (isOrigin, unfold) -> isOrigin, true, Choice2Of3 (fun _ state -> unfold state) + | AccessStrategy.Snapshot (isOrigin, toSnapshot) -> isOrigin, true, Choice2Of3 (fun _ -> toSnapshot >> Array.singleton) + | AccessStrategy.MultiSnapshot (isOrigin, unfold) -> isOrigin, true, Choice2Of3 (fun _ -> unfold) | AccessStrategy.RollingState toSnapshot -> (fun _ -> true), true, Choice3Of3 (fun _ state -> Array.empty, toSnapshot state |> Array.singleton) | AccessStrategy.Custom (isOrigin, transmute) -> isOrigin, true, Choice3Of3 transmute { inherit Equinox.Category<'event, 'state, 'req>(name, diff --git a/src/Equinox.DynamoStore/DynamoStore.fs b/src/Equinox.DynamoStore/DynamoStore.fs index 305a57ca0..6cb253eab 100644 --- a/src/Equinox.DynamoStore/DynamoStore.fs +++ b/src/Equinox.DynamoStore/DynamoStore.fs @@ -45,8 +45,8 @@ type Event = causationId: string option } interface ITimelineEvent with member x.Index = x.i - member x.IsUnfold = false - member x.Context = null + member _.IsUnfold = false + member _.Context = null member x.Size = Event.Bytes x member x.EventType = x.c member x.Data = x.d @@ -80,15 +80,15 @@ type Unfold = m: InternalBody } interface ITimelineEvent with member x.Index = x.i - member x.IsUnfold = true - member x.Context = null + member _.IsUnfold = true + member _.Context = null member x.Size = Unfold.Bytes x member x.EventType = x.c member x.Data = x.d member x.Meta = x.m member _.EventId = Guid.Empty - member x.CorrelationId = null - member x.CausationId = null + member _.CorrelationId = null + member _.CausationId = null member x.Timestamp = x.t static member Bytes(x: Unfold) = x.c.Length + InternalBody.bytes x.d + InternalBody.bytes x.m + 50 module Unfold = @@ -426,21 +426,21 @@ module private Async = type internal BatchIndices = { isTip: bool; index: int64; n: int64 } type StoreTable(name, createContext: (RequestMetrics -> unit) -> TableContext) = - member _.Context(collector) = createContext collector - member _.Name = name + member _.CreateContext(collector) = createContext collector + member val Name = name /// As per Equinox.CosmosStore, we assume the table to be provisioned correctly (see DynamoStoreContext.Establish(ConnectMode) re validating on startup) static member Create(client, tableName) = let createContext collector = TableContext(client, tableName, metricsCollector = collector) StoreTable(tableName, createContext) - member x.TryGetTip(stream: string, consistentRead, ct): Task = task { + member _.TryGetTip(stream: string, consistentRead, ct): Task = task { let rm = Metrics() let context = createContext rm.Add let pk = Batch.tableKeyForStreamTip stream let! item = context.TryGetItemAsync(pk, consistentRead) |> Async.executeAsTask ct return item |> Option.map Batch.ofSchema, rm.Consumed } - member x.TryUpdateTip(stream: string, updateExpr: Quotations.Expr Batch.Schema>, ct, ?precondition): Task = task { + member _.TryUpdateTip(stream: string, updateExpr: Quotations.Expr Batch.Schema>, ct, ?precondition): Task = task { let rm = Metrics() let context = createContext rm.Add let pk = Batch.tableKeyForStreamTip stream @@ -479,7 +479,7 @@ type StoreTable(name, createContext: (RequestMetrics -> unit) -> TableContext () | le -> yield! aux (index + 1, le) } aux (0, None) - member x.DeleteItem(stream: string, i, ct): Task = task { + member _.DeleteItem(stream: string, i, ct): Task = task { let rm = Metrics() let context = createContext rm.Add let pk = TableKey.Combined(stream, i) @@ -567,7 +567,7 @@ module internal Sync = let etag' = let g = Guid.NewGuid() in g.ToString "N" let actions = generateRequests stream requestArgs etag' let rm = Metrics() - try do! let context = table.Context(rm.Add) + try do! let context = table.CreateContext(rm.Add) match actions with | [ TransactWrite.Put (item, Some cond) ] -> context.PutItemAsync(item, cond) |> Async.Ignore | [ TransactWrite.Update (key, Some cond, updateExpr) ] -> context.UpdateItemAsync(key, updateExpr, cond) |> Async.Ignore @@ -1148,6 +1148,23 @@ open Equinox.Core open Equinox.DynamoStore.Core open System +type ProvisionedThroughput = FSharp.AWS.DynamoDB.ProvisionedThroughput +type Throughput = FSharp.AWS.DynamoDB.Throughput + +type StreamViewType = Amazon.DynamoDBv2.StreamViewType +type Streaming = FSharp.AWS.DynamoDB.Streaming + +[] +type ConnectMode = + | SkipVerify + | Verify + | CreateIfNotExists of Throughput +module internal ConnectMode = + let apply client tableName = function + | SkipVerify -> async { () } + | Verify -> Initialization.verify client tableName + | CreateIfNotExists throughput -> Initialization.createIfNotExists client tableName (throughput, Initialization.StreamingMode.New) + type [] ConnectionMode = AwsEnvironment of systemName: string | AwsKeyCredentials of serviceUrl: string /// Manages Creation and configuration of an IAmazonDynamoDB connection @@ -1184,26 +1201,10 @@ type DynamoStoreConnector(clientConfig: Amazon.DynamoDBv2.AmazonDynamoDBConfig, | None -> new Amazon.DynamoDBv2.AmazonDynamoDBClient(clientConfig) // this uses credentials=FallbackCredentialsFactory.GetCredentials() | Some credentials -> new Amazon.DynamoDBv2.AmazonDynamoDBClient(credentials, clientConfig) :> Amazon.DynamoDBv2.IAmazonDynamoDB - -type ProvisionedThroughput = FSharp.AWS.DynamoDB.ProvisionedThroughput -type Throughput = FSharp.AWS.DynamoDB.Throughput - -type StreamViewType = Amazon.DynamoDBv2.StreamViewType -type Streaming = FSharp.AWS.DynamoDB.Streaming - -[] -type ConnectMode = - | SkipVerify - | Verify - | CreateIfNotExists of Throughput -module internal ConnectMode = - let apply client tableName = function - | SkipVerify -> async { () } - | Verify -> Initialization.verify client tableName - | CreateIfNotExists throughput -> Initialization.createIfNotExists client tableName (throughput, Initialization.StreamingMode.New) + member x.CreateDynamoStoreClient() = x.CreateDynamoDbClient() |> DynamoStoreClient /// Holds the DynamoDB Client(s). There should not need to be more than a single instance per process -type DynamoStoreClient(client: Amazon.DynamoDBv2.IAmazonDynamoDB, +and DynamoStoreClient(client: Amazon.DynamoDBv2.IAmazonDynamoDB, // Client to use for fallback tables. // Events that have been archived and purged (and hence are missing from the primary) are retrieved from this Table [] ?fallbackClient: Amazon.DynamoDBv2.IAmazonDynamoDB) = @@ -1298,8 +1299,8 @@ type DynamoStoreCategory<'event, 'state, 'req> = match access with | AccessStrategy.Unoptimized -> (fun _ -> false), false, Choice1Of3 () | AccessStrategy.LatestKnownEvent -> (fun _ -> true), true, Choice2Of3 (fun events _ -> events |> Array.last |> Array.singleton) - | AccessStrategy.Snapshot (isOrigin, toSnapshot) -> isOrigin, true, Choice2Of3 (fun _ state -> toSnapshot state |> Array.singleton<'event>) - | AccessStrategy.MultiSnapshot (isOrigin, unfold) -> isOrigin, true, Choice2Of3 (fun _ (state: 'state) -> unfold state) + | AccessStrategy.Snapshot (isOrigin, toSnapshot) -> isOrigin, true, Choice2Of3 (fun _ -> toSnapshot >> Array.singleton<'event>) + | AccessStrategy.MultiSnapshot (isOrigin, unfold) -> isOrigin, true, Choice2Of3 (fun _ -> unfold) | AccessStrategy.RollingState toSnapshot -> (fun _ -> true), true, Choice3Of3 (fun _ state -> Array.empty, toSnapshot state |> Array.singleton) | AccessStrategy.Custom (isOrigin, transmute) -> isOrigin, true, Choice3Of3 transmute { inherit Equinox.Category<'event, 'state, 'req>(name, @@ -1383,7 +1384,7 @@ type EventsContext /// Callers should implement appropriate idempotent handling, or use Equinox.Decider for that purpose member x.Sync(streamName, position, events: IEventData<_>[]): Async> = async { let store, stream = resolve streamName - match! store.Sync(log, stream, Some position, Position.toIndex >> Sync.Exp.Version, position.index, events, Seq.empty) with + match! x.Sync(log, stream, Some position, Position.toIndex >> Sync.Exp.Version, position.index, events, Seq.empty) with | InternalSyncResult.Written (Token.Unpack pos) -> return AppendResult.Ok (Position.flatten pos) | InternalSyncResult.ConflictUnknown -> return AppendResult.ConflictUnknown } #endif diff --git a/src/Equinox/Equinox.fsproj b/src/Equinox/Equinox.fsproj index 91b05631b..e1f18b2d1 100644 --- a/src/Equinox/Equinox.fsproj +++ b/src/Equinox/Equinox.fsproj @@ -24,7 +24,7 @@ contentfiles - + diff --git a/tests/Equinox.CosmosStore.Integration/AccessStrategies.fs b/tests/Equinox.CosmosStore.Integration/AccessStrategies.fs index 14bd63b63..e69f299ce 100644 --- a/tests/Equinox.CosmosStore.Integration/AccessStrategies.fs +++ b/tests/Equinox.CosmosStore.Integration/AccessStrategies.fs @@ -43,7 +43,7 @@ module SequenceCheck = | Add of {| value : int |} interface TypeShape.UnionContract.IUnionContract #if STORE_DYNAMO - let codec = FsCodec.SystemTextJson.Codec.Create() |> FsCodec.Deflate.EncodeTryDeflate + let codec = FsCodec.SystemTextJson.Codec.Create() |> FsCodec.Compression.EncodeTryCompress #else let codec = FsCodec.SystemTextJson.CodecJsonElement.Create() #endif diff --git a/tests/Equinox.CosmosStore.Integration/DocumentStoreIntegration.fs b/tests/Equinox.CosmosStore.Integration/DocumentStoreIntegration.fs index 8ff590019..caa1ebefb 100644 --- a/tests/Equinox.CosmosStore.Integration/DocumentStoreIntegration.fs +++ b/tests/Equinox.CosmosStore.Integration/DocumentStoreIntegration.fs @@ -16,7 +16,7 @@ open Equinox.CosmosStore.Integration.CosmosFixtures module Cart = let fold, initial = Cart.Fold.fold, Cart.Fold.initial #if STORE_DYNAMO - let codec = Cart.Events.codec |> FsCodec.Deflate.EncodeTryDeflate + let codec = Cart.Events.codec |> FsCodec.Compression.EncodeTryCompress #else let codec = Cart.Events.codecJe #endif @@ -48,7 +48,7 @@ module Cart = module ContactPreferences = let fold, initial = ContactPreferences.Fold.fold, ContactPreferences.Fold.initial #if STORE_DYNAMO - let codec = ContactPreferences.Events.codec |> FsCodec.Deflate.EncodeTryDeflate + let codec = ContactPreferences.Events.codec |> FsCodec.Compression.EncodeTryCompress #else let codec = ContactPreferences.Events.codecJe #endif diff --git a/tools/Equinox.Tool/Program.fs b/tools/Equinox.Tool/Program.fs index 6f15cae65..41a7ed80d 100644 --- a/tools/Equinox.Tool/Program.fs +++ b/tools/Equinox.Tool/Program.fs @@ -450,10 +450,9 @@ module CosmosStats = let inParallel = p.Contains Parallel let connector, dName, cName = CosmosInit.connect log sp let container = connector.CreateUninitialized().GetContainer(dName, cName) - let ops = - [ if doS then yield "Streams", """SELECT VALUE COUNT(1) FROM c WHERE c.id="-1" """ - if doD then yield "Documents", """SELECT VALUE COUNT(1) FROM c""" - if doE then yield "Events", """SELECT VALUE SUM(c.n) FROM c WHERE c.id="-1" """ ] + let ops = [| if doS then "Streams", """SELECT VALUE COUNT(1) FROM c WHERE c.id="-1" """ + if doD then "Documents", """SELECT VALUE COUNT(1) FROM c""" + if doE then "Events", """SELECT VALUE SUM(c.n) FROM c WHERE c.id="-1" """ |] log.Information("Computing {measures} ({mode})", Seq.map fst ops, (if inParallel then "in parallel" else "serially")) ops |> Seq.map (fun (name, sql) -> async { log.Debug("Running query: {sql}", sql)