From e277183bb27afa086607cd260160a7482f5409c9 Mon Sep 17 00:00:00 2001 From: Olivier Cazade Date: Tue, 5 Jul 2022 17:58:46 +0200 Subject: [PATCH] Added metrics to kafka ingest --- pkg/pipeline/ingest/ingest_kafka.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pkg/pipeline/ingest/ingest_kafka.go b/pkg/pipeline/ingest/ingest_kafka.go index 26f2624be..ac122ddbb 100644 --- a/pkg/pipeline/ingest/ingest_kafka.go +++ b/pkg/pipeline/ingest/ingest_kafka.go @@ -97,10 +97,12 @@ func (ingestK *ingestKafka) processLogLines(out chan<- []config.GenericMap) { if len(records) >= ingestK.batchMaxLength { log.Debugf("ingestKafka sending %d records, %d entries waiting", len(records), len(ingestK.in)) decoded := ingestK.decoder.Decode(records) - out <- decoded + linesProcessed.Add(float64(len(records))) + queueLength.Set(float64(len(out))) ingestK.prevRecords = decoded log.Debugf("prevRecords = %v", ingestK.prevRecords) records = []interface{}{} + out <- decoded } case <-flushRecords.C: // Maximum batch time for each batch // Process batch of records (if not empty) @@ -113,6 +115,8 @@ func (ingestK *ingestKafka) processLogLines(out chan<- []config.GenericMap) { } log.Debugf("ingestKafka sending %d records, %d entries waiting", len(records), len(ingestK.in)) decoded := ingestK.decoder.Decode(records) + linesProcessed.Add(float64(len(records))) + queueLength.Set(float64(len(out))) ingestK.prevRecords = decoded log.Debugf("prevRecords = %v", ingestK.prevRecords) out <- decoded