-
Notifications
You must be signed in to change notification settings - Fork 71
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
WIP: Feature/live wpc reconfigure #263
WIP: Feature/live wpc reconfigure #263
Conversation
@haf add some temporary code
really don't know how to and is logary's own targets dispatching need to implement in |
if(_fwClock != local.Item2) | ||
{ | ||
_logger = local.Item1.GetLogger(_name); | ||
_fwClock = local.Item2; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not bump the clock?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
every time we initialize config, the clock increases by 1. and when we log message, we will check if the local config clock same as the current config's clock , if not, get new logger from new config and update the clock then log message use the new logger .
e.g.
- the default fwClock is 1.
- initialize logging config , config clock will be 2
- if we initialize logging config twice here or between some time, config clock will be 3
- then log message through facade api, it should be updating config and clock once.
so assign the current clock to the fwClock (not by adding one) can avoid more checking .
@@ -783,7 +783,7 @@ module Global = | |||
let cfg, cfgClock = !config | |||
if cfgClock <> fwClock then | |||
logger <- cfg.getLogger name | |||
fwClock <- fwClock + 1u | |||
fwClock <- cfgClock |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here... Why?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see reply above
@@ -75,7 +75,7 @@ module internal Global = | |||
let cfg, cfgClock = !config // reread the config's clock after taking lock | |||
if cfgClock <> fwClock then // recheck after taking lock to avoid races | |||
logger <- cfg.getLogger name // get the current logger | |||
fwClock <- fwClock + 1u // update instance's clock | |||
fwClock <- cfgClock // update instance's clock |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
...and here...
src/Logary/Registry.fs
Outdated
let targetName = Message.tryGetContext "target" message | ||
match targetName with | ||
| Some (String targetName) -> | ||
let subscriber = HashMap.tryFind targetName subsribers |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
2 space indentation
src/Logary/Registry.fs
Outdated
let rec loop () = | ||
let inputCh, emitCh, shutdownCh, subscriberCh = Ch (), Ch (), Ch (), Ch () | ||
|
||
let engine = { subscriptions = HashMap.empty |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Format on new line
I think the PR is going in the right direction, and I understand it's primarily about discussion right now.
Yes; I was thinking an infinite source that's a synchronous/isolated/threaded thing, taking messages from the surrounding system. The "processing" function is then composed with module Engine =
let create config =
let buffer = Array.zeroCreate 512
// bridge async/sync gap
let rec source =
Job.server (Job.isolate (job {
while hasValues buffer do
Interlocked.Increment(&chaser, 1)
}))
// in the engine loop:
Flow.run (readBuffer chaser head buffer |> toSinks config.targets)
// then somewhere below
Job.result engine
let engine =
memo (Engine.create ())
let impl level fac: Alt<Promise<unit>> =
engine >>= fun engine ->
Alt.prepareFun <| fun nack ->
// allocate index
let i = ref 0
let p = Promise ()
while Interlocked.CompareAndSwap(&head, ...) <> ... && not (Promise.isFulfilled nack) then
buffer.[i] <- (level, fac, p)
p
let logger = { new Logger with member x.logWithAck level fac = impl level fac }
let usage =
let a = fn ()
logger.info (eventX "Thing {a}" >> setField "a" a) Then Targets implement processing? No, I'm thinking something like (pseudo code): let processing =
Flow.map enrich
>> Flow.groupBy (Flow.rollingWindow 5s) (fun x -> x.fields.['service'])
>> Flow.sink (target "elasticsearch")
let processing2 =
let stream1 =
Flow.map enrich
>> ...
let stream2 =
Flow.map alter
>> ...
Flow.joinBy (Flow.rollingWindow 5s) (fun a b -> a.name = b.context.["relatedTo"]) Each sink could then be a source of a de-multiplexer/fan-in function that writes to the RingBuffers of the named targets (found by config lookup). |
are we going build an previous question :
i mean in previous version , when we get a logger, we will find the corresponding targets, and put message to targets buffer when logging. but now, when we get a logger and logging, all message will go to the |
Yes, then all messages are discarded; because the new processing will take the place of
Yes |
thanks for confirming, it is getting clearer. will try to carry on |
@haf Can you help me check if |
I was thinking, when we implemented the event processing builder, the logger itself need less state, it is just an and |
It doesn't have any state, but a reference to the engine.
yes
Is needed, because you may have multiple logary instances running concurrently and the LogManager/Registry/Engine needs an API to expose to the user.
How? Given the above requirement for not-just-globals.
But there still is; we still need a way of accepting the message into a given target and then ack on send. |
change my previous thoughts a bit , yes you are right, the api is need, because multiple logary/registry instances. commit some changes, if you have time to look at it . |
We should have the My environment is improving and I can almost always use F# now. |
right now, let inline tick (ticker:Ticker<'state,_,_>) pipe =
pipe
|> chain (fun cont ->
let updateMb = Mailbox ()
let rec loop state =
Alt.choose [
ticker.Ticked ^=> fun _ ->
let state', res = ticker.HandleTick state
cont res
>>=. loop state'
updateMb ^=> (ticker.Folder state >> loop)
]
loop ticker.InitialState |> Hopac.start
fun prev -> updateMb *<<+ prev)
[<AbstractClass>]
type Ticker<'state,'t,'r> (initialState:'state) =
let tickCh = Ch<unit> ()
abstract member Folder : 'state -> 't -> 'state
abstract member HandleTick : 'state -> 'state * 'r
member this.InitialState = initialState
member this.Ticked = tickCh :> Alt<_>
member this.Tick () = tickCh *<- ()
member this.TickEvery timespan =
let cancellation = Cancellation.create ()
let rec loop () =
Alt.choose [
timeOut timespan ^=> fun _ ->
this.Tick () ^=> fun _ ->
loop ()
Cancellation.isCancelled cancellation
]
loop ()
|> Job.start
>>-. cancellation but i think
🎉 |
@haf i was thinking, |
@haf add percentile and EWMA event processing ,
i think aim here can work now. processing builder like this : Pipe.start |> Events.tag "metric request latency"
|> Pipe.bufferTime (TimeSpan.FromSeconds 5.)
|> Events.percentile 0.99
|> Pipe.map (fun num -> Message.event Info (sprintf "99th percentile of request latency every rolling 5 second window is %A" num))
let fiveMinutesEWMATicker = EWMATicker (Duration.FromSeconds 1L, Duration.FromMinutes 5L)
Pipe.start |> Events.tag "metric request latency"
|> Pipe.map (fun msg -> msg.value |> function Gauge (Int64 v,_) -> v | _ -> 1L)
|> Pipe.withTickJob (fiveMinutesEWMATicker.TickEvery (TimeSpan.FromSeconds 10.))
|> Pipe.tick fiveMinutesEWMATicker
|> Pipe.map (fun rate -> Message.event Info (sprintf "fiveMinutesEWMA of request latency's rate(sample/sec) is %A" rate)) |
Ping ? |
@lust4life Yes, that could work. How do would you design the filtering and end-to-end declaration of the processing pipeline? I don't really understand the code though; you have |
generally, most of the time-related pipe are encapsulating them as one like this: let inline bufferTime timespan pipe =
let ticker = BufferTicker ()
pipe
|> withTickJob (ticker.TickEvery timespan)
|> tick ticker because a timer need be managed, so we should append them to pipe (there is what let rec running ctss =
Alt.choose [
...
shutdownCh ^=> fun (res, timeout) ->
rlogger.infoWithAck (eventX "Shutting down")
^=>. Seq.Con.iterJob Cancellation.cancel ctss
>>=. shutdown targets timeout
>>= fun shutdownInfo -> res *<= shutdownInfo
]
let state =
{ runtimeInfo = ri
msgProcessing = msgProcessing
flushCh = flushCh
shutdownCh = shutdownCh }
createGlobals conf.runtimeInfo.logger state
>>=. Seq.Con.mapJob id conf.processing.tickTimerJobs
>>= fun ctss -> Job.supervise rlogger (Policy.restartDelayed 500u) (running ctss)
>>-. state so, first instance usage is for setup a tick timer, second instance usage if for setup a ticker. in a situation which user want to control when to tick manually (like when some url route has been matched), they can tick through a ticker, by invoke
type EWMATicker<'t> (rateUnit, alphaPeriod) =
inherit Ticker<ExpWeightedMovAvg.EWMAState,int64,float>(ExpWeightedMovAvg.create alphaPeriod)
override this.Folder ewma item =
ExpWeightedMovAvg.update ewma item
override this.HandleTick ewma =
let ewma' = ExpWeightedMovAvg.tick ewma
let rate = ewma' |> ExpWeightedMovAvg.rateInUnit rateUnit
ewma', rate |
what does this mean, don't get the point. can you explain that a little bit more? |
I understand; I think the next point for me is to test your code a bit. What state is it in? |
the basic just add a test for show how to use Processing to builder an health checker. ping svc healthchecker on next, maybe try to add unit test for |
maybe we can support |
@haf ping ? 😸 |
Hey @lust4life You've proven that you're ready to stick to this project and help out, so I'm going to give you commit access; I'm looking forward to you merging this PR (and using your code!). We're using Logary throughout everything at my company, so I'm absolutely not abandoning this project, but I may have to keep a slightly lower profile. Let's keep the discussions going none the less. What do you say? Are you up for finishing up this PR and merging it to master as v5? Cheers |
@haf Last few days i was on holiday in my hometown. Next, I will follow up to finish it. If there are any problems, i will ping you 😸 |
module Registry =
/// The holder for the channels of communicating with the registry.
type T =
private {
runtimeInfo : RuntimeInfo
msgProcessing : Message -> Middleware option -> Alt<unit>
/// Flush all pending messages from the registry to await shutdown and
/// ack on the `ackCh` when done. If the client nacks the request, the
/// `nack` promise is filled with a unit value. Optional duration of how
/// long the flush 'waits' for targets before returning a FlushInfo.
flushCh : Ch<Ch<FlushInfo> * Promise<unit> * Duration option>
/// Shutdown the registry in full. This operation cannot be cancelled and
/// so the caller is promised a ShutdownInfo.
shutdownCh : Ch<IVar<ShutdownInfo> * Duration option>
}
member private x.Dispose (disposing : bool) =
(x :> IAsyncDisposable).DisposeAsync () |> Hopac.start
override x.Finalize () = x.Dispose false
interface IDisposable with
override x.Dispose () =
x.Dispose true
GC.SuppressFinalize(x)
interface IAsyncDisposable with
override x.DisposeAsync () =
x.shutdownCh *<+=>- fun reply -> reply, None
|> Job.startIgnore
@haf should we support something like this ? or let the users shutdown it manually, don't rely on GC. Because |
…ossible with obj (instead of Value Modle)
prepare for testing logary,ensure it can work mostly,then focus on breaking changes in other prjects.
Work continues at #219 |
remove pollServices in registry
Because supervisor will supervise minions, if an exception occurs, it will be handled according to policy. So there's no need for state polling, because we can't do much if we can get the status. the user's handle logic can be expressed by policy.
remove healthcheck & metric
remove healthcheck
Looks like metric can also be seen as a healthcheck, such as windows performance counters. when outside tick , metric generate some msg, which will be processed by user defined engine , they can decide their meaning (log or metric or healthcheck...) there.
remove metric
metric can be implemented by engine's process expression builder, what we need to do is schedule jobs for tick, in the corresponding time(like old metricConf.tickInterval) or manually tick. Metric's data sources can have two types, one is similar to WPC, generate data in real-time when ticked .the other is generate through the normal log message.
support multi targets on internallogger and apply rules when send msg to target
remove loglevel on Logger interface
message has the log level info, targets have the rules info, before putting msg to target, apply rules on message to decide if really send to target.
redefined Message
The aim here is to normalise the Gauge/Derived/Message semantics into a single semantic of 'things computed from events from your app'.
remove FieldModule.fs
use FsMessageTemplate for default MessageWriter
json support will use fspickler.json
no need SuppressPointValue
gauges are stored in contexts with gauge type. so message.value is template or raw message.
errors (exceptions)
is expressed by a context, not a field. user can define their own output template for decision whether or not to show them.
todo
more unit test
event format improvement mentioned by event format proposal #257
reorganize types in Events.fs
chiron new version in Serialisation.fs # no need use fspickler instead
complete todo comments part in codebase
maybe obsolete
PromisedLogger.fs
create logger is no longer async, just a function with registry instance
service.fs
Metrics.Ticked.fs -> replace by Events.Ticker
Transformers.fs -> replace by Events.xxx (should implement by events pipe style)
misc
log api 因为 pipe 的异步原因导致 Alt 失效了, 考虑重构, 提供一个 默认值 取代
Job<unit>
应该就好了考虑一下 logSimple timeout 是否会打破 unbound buffer 的情况
time xxx 的 logging api 考虑是否用 gaugetype 取代 logger name, 会破坏 api 的兼容性