Skip to content

Commit

Permalink
Add queryLogLevel 4.1.0-alpha.7
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed May 8, 2024
1 parent e9d7204 commit 94b0a2f
Showing 1 changed file with 34 additions and 23 deletions.
57 changes: 34 additions & 23 deletions src/Equinox.CosmosStore/CosmosStoreLinq.fs
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,14 @@ module Internal =
interval = interval; bytes = totalOds; count = items; ru = totalRu } in log |> Log.event evt
log.Information("EqxCosmos {action:l} {count} ({trips}r {totalRtt:f0}ms; {rdc}i {rds:f2}>{ods:f2} MiB) {rc:f2} RU {latency} ms",
"Index", items, responses, totalRtt.TotalMilliseconds, totalRdc, miB totalRds, miB totalOds, totalRu, interval.ElapsedMilliseconds) }
/// Runs a query that can by hydrated as 'T
let enum<'T> (log: ILogger) (container: Container) cat (queryDefinition: QueryDefinition): TaskSeq<'T> =
container.GetItemQueryIterator<'T>(queryDefinition) |> toAsyncEnum<'T> log container cat
/// Runs a query that renders 'T, Hydrating the results as 'P (can be the same types but e.g. you might want to map an object to a JsonElement etc)
let enum<'T, 'P> (log: ILogger) (container: Container) cat (query: IQueryable<'T>): TaskSeq<'P> =
let enumAs<'T, 'P> (log: ILogger) (container: Container) cat logLevel (query: IQueryable<'T>): TaskSeq<'P> =
let queryDefinition = query.ToQueryDefinition()
if log.IsEnabled Serilog.Events.LogEventLevel.Debug then log.Debug("CosmosStoreQuery.query {cat} {query}", cat, queryDefinition.QueryText)
container.GetItemQueryIterator<'P>(queryDefinition) |> toAsyncEnum<'P> log container cat
if log.IsEnabled logLevel then log.Write(logLevel, "CosmosStoreQuery.query {cat} {query}", cat, queryDefinition.QueryText)
enum<'P> log container cat queryDefinition
module AggregateOp =
/// Runs one of the typical Cosmos SDK extensions, e.g. CountAsync, logging the costs
let [<EditorBrowsable(EditorBrowsableState.Never)>] exec (log: ILogger) (container: Container) (op: string) (cat: string) (query: IQueryable<'T>) run render: System.Threading.Tasks.Task<'R> = task {
Expand All @@ -118,21 +121,21 @@ module Internal =
op, cat, summary, m.RetrievedDocumentCount, miB m.RetrievedDocumentSize, miB totalOds, totalRu, interval.ElapsedMilliseconds)
return res }
/// Runs query.CountAsync, with instrumentation equivalent to what query provides
let countAsync (log: ILogger) container cat (query: IQueryable<'T>) ct =
if log.IsEnabled Serilog.Events.LogEventLevel.Debug then log.Debug("CosmosStoreQuery.count {cat} {query}", cat, query.ToQueryDefinition().QueryText)
let countAsync (log: ILogger) container cat logLevel (query: IQueryable<'T>) ct =
if log.IsEnabled logLevel then log.Write(logLevel, "CosmosStoreQuery.count {cat} {query}", cat, query.ToQueryDefinition().QueryText)
exec log container "count" cat query (_.CountAsync(ct)) id<int>
module Scalar =
/// Generates a TOP 1 SQL query
let top1 (query: IQueryable<'T>) =
query.Take(1)
/// Handles a query that's expected to yield 0 or 1 result item
let tryHeadAsync<'T, 'R> (log: ILogger) (container: Container) cat (query: IQueryable<'T>) (_ct: CancellationToken): Task<'R option> =
let tryHeadAsync<'T, 'R> (log: ILogger) (container: Container) cat logLevel (query: IQueryable<'T>) (_ct: CancellationToken): Task<'R option> =
let queryDefinition = (top1 query).ToQueryDefinition()
if log.IsEnabled Serilog.Events.LogEventLevel.Debug then log.Debug("CosmosStoreQuery.tryScalar {cat} {query}", queryDefinition.QueryText)
if log.IsEnabled logLevel then log.Write(logLevel, "CosmosStoreQuery.tryScalar {cat} {query}", queryDefinition.QueryText)
container.GetItemQueryIterator<'R>(queryDefinition) |> Query.toAsyncEnum log container cat |> TaskSeq.tryHead
type Projection<'T, 'M>(query, category, container, enum: IQueryable<'T> -> TaskSeq<'M>, count: IQueryable<'T> -> CancellationToken -> Task<int>) =
static member Create<'P>(q, cat, c, log, hydrate: 'P -> 'M) =
Projection<'T, 'M>(q, cat, c, Query.enum<'T, 'P> log c cat >> TaskSeq.map hydrate, AggregateOp.countAsync log c cat)
static member Create<'P>(q, cat, c, log, hydrate: 'P -> 'M, logLevel) =
Projection<'T, 'M>(q, cat, c, Query.enumAs<'T, 'P> log c cat logLevel >> TaskSeq.map hydrate, AggregateOp.countAsync log c cat logLevel)
member _.Enum: TaskSeq<'M> = query |> enum
member x.EnumPage(skip, take): TaskSeq<'M> = query |> Query.offsetLimit (skip, take) |> enum
member _.CountAsync: CancellationToken -> Task<int> = query |> count
Expand Down Expand Up @@ -162,6 +165,14 @@ type SnAndSnap<'I>() =
Expression.Bind(snapMember, snapExpression.Body.Replace(snapExpression.Parameters[0], param)) |]),
[| param |])

/// Represents a query projecting information values from an Index and/or Snapshots with a view to rendering the items and/or a count
type Query<'T, 'M>(inner: Internal.Projection<'T, 'M>) =
member _.Enum: TaskSeq<'M> = inner.Enum
member _.EnumPage(skip, take): TaskSeq<'M> = inner.EnumPage(skip, take)
member _.CountAsync(ct: CancellationToken): Task<int> = inner.CountAsync ct
member _.Count(): Async<int> = inner.CountAsync |> Async.call
[<EditorBrowsable(EditorBrowsableState.Never)>] member val Inner = inner

/// Helpers for Querying and Projecting results based on relevant aspects of Equinox.CosmosStore's storage schema
module Index =

Expand All @@ -188,27 +199,23 @@ module Index =
container.GetItemLinqQueryable<Item<'I>>().Where(fun d -> d.p.StartsWith(prefix) && d.u[0].c = caseName)

/// Returns the StreamName (from the `p` field) for a 0/1 item query; only the TOP 1 item is returned
let tryGetStreamNameAsync log cat container (query: IQueryable<Item<'I>>) ct =
Internal.Scalar.tryHeadAsync<string, FsCodec.StreamName> log cat container (query.Select(fun x -> x.p)) ct
let tryGetStreamNameAsync log cat logLevel container (query: IQueryable<Item<'I>>) ct =
Internal.Scalar.tryHeadAsync<string, FsCodec.StreamName> log cat logLevel container (query.Select(fun x -> x.p)) ct

/// Query the items, returning the Stream name and the Snapshot as a JsonElement (Decompressed if applicable)
let projectStreamNameAndSnapshot<'I> snapExpression: Expression<Func<Item<'I>, SnAndSnap<'I>>> =
// a very ugly workaround for not being able to write query.Select<Item<'I>,Internal.SnAndSnap<'I>>(fun x -> { p = x.p; snap = x.u[0].d })
let pExpression item = Expression.PropertyOrField(item, nameof Unchecked.defaultof<Item<'I>>.p)
SnAndSnap.CreateItemQueryLambda(pExpression, snapExpression)

/// Represents a query projecting information values from an Index and/or Snapshots with a view to rendering the items and/or a count
type Query<'T, 'M>(inner: Internal.Projection<'T, 'M>) =
member _.Enum: TaskSeq<'M> = inner.Enum
member _.EnumPage(skip, take): TaskSeq<'M> = inner.EnumPage(skip, take)
member _.CountAsync(ct: CancellationToken): Task<int> = inner.CountAsync ct
member _.Count(): Async<int> = inner.CountAsync |> Async.call
[<EditorBrowsable(EditorBrowsableState.Never)>] member val Inner = inner
let createSnAndSnapshotQuery<'I, 'M> log container cat logLevel (hydrate: SnAndSnap<System.Text.Json.JsonElement> -> 'M) (query: IQueryable<SnAndSnap<'I>>) =
Internal.Projection.Create(query, cat, container, log, hydrate, logLevel) |> Query<SnAndSnap<'I>, 'M>

/// Enables querying based on uncompressed Indexed values stored as secondary unfolds alongside the snapshot
[<NoComparison; NoEquality>]
type IndexContext<'I>(container, categoryName, caseName, log) =
type IndexContext<'I>(container, categoryName, caseName, log, [<O; D null>]?queryLogLevel) =

let queryLogLevel = defaultArg queryLogLevel Serilog.Events.LogEventLevel.Debug
member val Log = log
member val Description = $"{categoryName}/{caseName}" with get, set
member val Container = container
Expand All @@ -228,14 +235,18 @@ type IndexContext<'I>(container, categoryName, caseName, log) =
Index.byCategoryNameOnly<'I> container categoryName

/// Runs the query; yields the StreamName from the TOP 1 Item matching the criteria
member x.TryGetStreamNameWhereAsync(criteria: Expressions.Expression<Func<Index.Item<'I>, bool>>, ct) =
Index.tryGetStreamNameAsync x.Log container categoryName (x.ByCategory().Where criteria) ct
member x.TryGetStreamNameWhereAsync(criteria: Expressions.Expression<Func<Index.Item<'I>, bool>>, ct, [<O; D null>] ?logLevel) =
let logLevel = defaultArg logLevel queryLogLevel
Index.tryGetStreamNameAsync x.Log container categoryName logLevel (x.ByCategory().Where criteria) ct

/// Runs the query; yields the StreamName from the TOP 1 Item matching the criteria
member x.TryGetStreamNameWhere(criteria: Expressions.Expression<Func<Index.Item<'I>, bool>>): Async<FsCodec.StreamName option> =
(fun ct -> x.TryGetStreamNameWhereAsync(criteria, ct)) |> Async.call

/// Query the items, grabbing the Stream name and the Snapshot; The StreamName and the (Decompressed if applicable) Snapshot are passed to `hydrate`
member x.QueryStreamNameAndSnapshot(query: IQueryable<Index.Item<'I>>, selectBody: Expression<Func<Index.Item<'I>, 'I>>,
hydrate: SnAndSnap<System.Text.Json.JsonElement> -> 'M) =
Internal.Projection.Create(query.Select(Index.projectStreamNameAndSnapshot<'I> selectBody), categoryName, container, x.Log, hydrate) |> Query
hydrate: SnAndSnap<System.Text.Json.JsonElement> -> 'M,
[<O; D null>] ?logLevel): Query<SnAndSnap<'I>, 'M> =
let logLevel = defaultArg logLevel queryLogLevel
query.Select(Index.projectStreamNameAndSnapshot<'I> selectBody)
|> Index.createSnAndSnapshotQuery x.Log container categoryName logLevel hydrate

0 comments on commit 94b0a2f

Please sign in to comment.