Skip to content

Commit

Permalink
feat(Cosmos InternalMetrics): Group by Container
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Mar 27, 2024
1 parent 2b66792 commit f4c94d4
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 59 deletions.
120 changes: 63 additions & 57 deletions src/Equinox.CosmosStore/CosmosStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -267,80 +267,86 @@ module Log =
type internal Counter =
{ mutable rux100: int64; mutable count: int64; mutable ms: int64 }
static member Create() = { rux100 = 0L; count = 0L; ms = 0L }
member x.Ingest(ru, ms) =
member x.Ingest(ms, ru) =
Interlocked.Increment(&x.count) |> ignore
Interlocked.Add(&x.rux100, int64 (ru*100.)) |> ignore
Interlocked.Add(&x.rux100, int64 (ru * 100.)) |> ignore
Interlocked.Add(&x.ms, ms) |> ignore
let inline private (|RcMs|) ({ interval = i; ru = ru }: Measurement) =
ru, int64 i.ElapsedMilliseconds
type internal Counters() =
let containers = System.Collections.Concurrent.ConcurrentDictionary<string, Counter>()
let create (_name: string) = Counter.Create()
member _.Ingest(container, ms, ru) = containers.GetOrAdd(container, create).Ingest(ms, ru)
member _.Containers = containers.Keys
member _.TryContainer container = match containers.TryGetValue container with true, t -> Some t | false, _ -> None
type Epoch() =
let epoch = System.Diagnostics.Stopwatch.StartNew()
member val internal Read = Counters() with get, set
member val internal Write = Counters() with get, set
member val internal Resync = Counters() with get, set
member val internal Conflict = Counters() with get, set
member val internal Prune = Counters() with get, set
member val internal Delete = Counters() with get, set
member val internal Trim = Counters() with get, set
member _.Stop() = epoch.Stop()
member _.Elapsed = epoch.Elapsed
let inline private (|BucketMsRu|) ({ container = c; interval = i; ru = ru }: Measurement) =
c, int64 i.ElapsedMilliseconds, ru
type LogSink() =
static let epoch = System.Diagnostics.Stopwatch.StartNew()
static member val internal Read = Counter.Create() with get, set
static member val internal Write = Counter.Create() with get, set
static member val internal Resync = Counter.Create() with get, set
static member val internal Conflict = Counter.Create() with get, set
static member val internal Prune = Counter.Create() with get, set
static member val internal Delete = Counter.Create() with get, set
static member val internal Trim = Counter.Create() with get, set
static let mutable epoch = Epoch()
static member Restart() =
LogSink.Read <- Counter.Create()
LogSink.Write <- Counter.Create()
LogSink.Resync <- Counter.Create()
LogSink.Conflict <- Counter.Create()
LogSink.Prune <- Counter.Create()
LogSink.Delete <- Counter.Create()
LogSink.Trim <- Counter.Create()
let span = epoch.Elapsed
epoch.Restart()
span
let fresh = Epoch()
let outgoing = Interlocked.Exchange(&epoch, fresh)
outgoing.Stop()
outgoing
interface Serilog.Core.ILogEventSink with
member _.Emit logEvent =
match logEvent with
| MetricEvent cm ->
match cm with
| Op ((Operation.Tip | Operation.Tip404 | Operation.Tip304 | Operation.Query), RcMs m) ->
LogSink.Read.Ingest m
| Op ((Operation.Tip | Operation.Tip404 | Operation.Tip304 | Operation.Query), BucketMsRu m) ->
epoch.Read.Ingest m
| QueryRes (_direction, _) -> ()
| Op (Operation.Write, RcMs m) -> LogSink.Write.Ingest m
| Op (Operation.Conflict, RcMs m) -> LogSink.Conflict.Ingest m
| Op (Operation.Resync, RcMs m) -> LogSink.Resync.Ingest m
| Op (Operation.Prune, RcMs m) -> LogSink.Prune.Ingest m
| Op (Operation.Write, BucketMsRu m) -> epoch.Write.Ingest m
| Op (Operation.Conflict, BucketMsRu m) -> epoch.Conflict.Ingest m
| Op (Operation.Resync, BucketMsRu m) -> epoch.Resync.Ingest m
| Op (Operation.Prune, BucketMsRu m) -> epoch.Prune.Ingest m
| PruneRes _ -> ()
| Op (Operation.Delete, RcMs m) -> LogSink.Delete.Ingest m
| Op (Operation.Trim, RcMs m) -> LogSink.Trim.Ingest m
| Op (Operation.Delete, BucketMsRu m) -> epoch.Delete.Ingest m
| Op (Operation.Trim, BucketMsRu m) -> epoch.Trim.Ingest m
| _ -> ()

/// Relies on feeding of metrics from Log through to Stats.LogSink
/// Use Stats.LogSink.Restart() to reset the start point (and stats) where relevant
let dump (log: ILogger) =
let res = Stats.LogSink.Restart()
let stats =
[ "Read", Stats.LogSink.Read
"Write", Stats.LogSink.Write
"Resync", Stats.LogSink.Resync
"Conflict", Stats.LogSink.Conflict
"Prune", Stats.LogSink.Prune
"Delete", Stats.LogSink.Delete
"Trim", Stats.LogSink.Trim ]
let mutable rows, totalCount, totalRRu, totalWRu, totalMs = 0, 0L, 0., 0., 0L
let logActivity name count ru lat =
let aru, ams = (if count = 0L then Double.NaN else ru/float count), (if count = 0L then Double.NaN else float lat/float count)
let rut = name |> function
| "TOTAL" -> "" | "Read" | "Prune" -> totalRRu <- totalRRu + ru; "R"
| _ -> totalWRu <- totalWRu + ru; "W"
log.Information("{name}: {count:n0}r {ru:n0}{rut:l}RU Average {avgRu:n1}RU {lat:n0}ms", name, count, ru, rut, aru, ams)
for name, stat in stats do
if stat.count <> 0L then
let ru = float stat.rux100 / 100.
totalCount <- totalCount + stat.count
totalMs <- totalMs + stat.ms
logActivity name stat.count ru stat.ms
rows <- rows + 1
// Yes, there's a minor race here between the use of the values and the reset
let duration = Stats.LogSink.Restart()
if rows > 1 then logActivity "TOTAL" totalCount (totalRRu + totalWRu) totalMs
let measures: (string * (TimeSpan -> float)) list = [ "s", _.TotalSeconds(*; "m", _.TotalMinutes; "h", _.TotalHours*) ]
let logPeriodicRate name count rru wru = log.Information("{rru:n1}R/{wru:n1}W CU @ {count:n0} rp{unit}", rru, wru, count, name)
for uom, f in measures do let d = f duration in if d <> 0. then logPeriodicRate uom (float totalCount/d |> int64) (totalRRu/d) (totalWRu/d)
[|nameof res.Read, res.Read
nameof res.Write, res.Write
nameof res.Resync, res.Resync
nameof res.Conflict, res.Conflict
nameof res.Prune, res.Prune
nameof res.Delete, res.Delete
nameof res.Trim, res.Trim |]
for container in stats |> Seq.collect (fun (_n, stat) -> stat.Containers) |> Seq.distinct |> Seq.sort do
let mutable rows, totalCount, totalRRu, totalWRu, totalMs = 0, 0L, 0., 0., 0L
let logActivity name count ru lat =
let aru, ams = (if count = 0L then Double.NaN else ru/float count), (if count = 0L then Double.NaN else float lat/float count)
let rut = name |> function
| "TOTAL" -> "" | nameof res.Read | nameof res.Prune -> totalRRu <- totalRRu + ru; "R"
| _ -> totalWRu <- totalWRu + ru; "W"
log.Information("{container} {name}: {count:n0}r {ru:n0}{rut:l}RU Average {avgRu:n1}RU {lat:n0}ms", container, name, count, ru, rut, aru, ams)
for name, stat in stats do
match stat.TryContainer container with
| Some stat when stat.count <> 0L ->
let ru = float stat.rux100 / 100.
totalCount <- totalCount + stat.count
totalMs <- totalMs + stat.ms
logActivity name stat.count ru stat.ms
rows <- rows + 1
| _ -> ()
if rows > 1 then logActivity "TOTAL" totalCount (totalRRu + totalWRu) totalMs
let measures: (string * (TimeSpan -> float)) list = [ "s", _.TotalSeconds(*; "m", _.TotalMinutes; "h", _.TotalHours*) ]
let logPeriodicRate name count rru wru = log.Information("{container} {rru:n1}R/{wru:n1}W RU @ {count:n0} rp{unit}", container, rru, wru, count, name)
for uom, f in measures do let d = f res.Elapsed in if d <> 0. then logPeriodicRate uom (float totalCount/d |> int64) (totalRRu/d) (totalWRu/d)

[<AutoOpen>]
module private MicrosoftAzureCosmosWrappers =
Expand Down
4 changes: 2 additions & 2 deletions src/Equinox.DynamoStore/DynamoStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -339,15 +339,15 @@ module Log =
let dump (log: ILogger) =
let res = Stats.LogSink.Restart()
let stats =
[ nameof res.Tip, res.Tip
[| nameof res.Tip, res.Tip
nameof res.Query, res.Query
nameof res.Append, res.Append
nameof res.Append409, res.Append409
nameof res.Calve, res.Calve
nameof res.Calve409, res.Calve409
nameof res.Prune, res.Prune
nameof res.Delete, res.Delete
nameof res.Trim, res.Trim ]
nameof res.Trim, res.Trim |]
for table in stats |> Seq.collect (fun (_n, stat) -> stat.Tables) |> Seq.distinct |> Seq.sort do
let mutable rows, totalCount, totalRRu, totalWRu, totalMs = 0, 0L, 0., 0., 0L
let logActivity name count ru lat =
Expand Down

0 comments on commit f4c94d4

Please sign in to comment.