@@ -2485,9 +2485,14 @@ var forceFlushInstanceWriteBuffer = make(chan bool)
2485
2485
2486
2486
func enqueueInstanceWrite (instance * Instance , instanceWasActuallyFound bool , lastError error ) {
2487
2487
if len (instanceWriteBuffer ) == config .Config .InstanceWriteBufferSize {
2488
- // Signal the "flushing" gorouting that there's work.
2488
+ // Signal the "flushing" goroutine that there's work.
2489
2489
// We prefer doing all bulk flushes from one goroutine.
2490
- forceFlushInstanceWriteBuffer <- true
2490
+ // Non blocking send to avoid blocking goroutines on sending a flush,
2491
+ // if the "flushing" goroutine is not able read is because a flushing is ongoing.
2492
+ select {
2493
+ case forceFlushInstanceWriteBuffer <- true :
2494
+ default :
2495
+ }
2491
2496
}
2492
2497
instanceWriteBuffer <- instanceUpdateObject {instance , instanceWasActuallyFound , lastError }
2493
2498
}
@@ -2512,7 +2517,11 @@ func flushInstanceWriteBuffer() {
2512
2517
writeBufferLatency .Stop ("wait" )
2513
2518
writeBufferLatency .Start ("flush" )
2514
2519
2515
- for i := 0 ; len (instanceWriteBuffer ) > 0 ; i ++ {
2520
+ // There are `DiscoveryMaxConcurrency` many goroutines trying to enqueue an instance into the buffer
2521
+ // when one instance is flushed from the buffer then one discovery goroutine is ready to enqueue a new instance
2522
+ // this is why we want to flush all instances in the buffer till a max of `InstanceWriteBufferSize`.
2523
+ // Otherwise we can flush way more instances than what's expected.
2524
+ for i := 0 ; i < config .Config .InstanceWriteBufferSize && len (instanceWriteBuffer ) > 0 ; i ++ {
2516
2525
upd := <- instanceWriteBuffer
2517
2526
if upd .instanceWasActuallyFound && upd .lastError == nil {
2518
2527
lastseen = append (lastseen , upd .instance )
0 commit comments