Skip to content

Commit 02d3c70

Browse files
authored
Cherry-pick #16886 to 7.x: [Libbeat] Remove global logger from libbeat publisher and reader (#16912)
* Remove global logger from libbeat publisher and reader (#16886) (cherry picked from commit ad2672d) * backport #16915 in this PR
1 parent 0eeb109 commit 02d3c70

File tree

8 files changed

+47
-26
lines changed

8 files changed

+47
-26
lines changed

libbeat/publisher/pipeline/output.go

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,17 @@ type netClientWorker struct {
4040

4141
batchSize int
4242
batchSizer func() int
43+
logger *logp.Logger
4344
}
4445

4546
func makeClientWorker(observer outputObserver, qu workQueue, client outputs.Client) outputWorker {
4647
if nc, ok := client.(outputs.NetworkClient); ok {
47-
c := &netClientWorker{observer: observer, qu: qu, client: nc}
48+
c := &netClientWorker{
49+
observer: observer,
50+
qu: qu,
51+
client: nc,
52+
logger: logp.NewLogger("publisher_pipeline_output"),
53+
}
4854
go c.run()
4955
return c
5056
}
@@ -85,24 +91,24 @@ func (w *netClientWorker) run() {
8591
batch.Cancelled()
8692

8793
if w.closed.Load() {
88-
logp.Info("Closed connection to %v", w.client)
94+
w.logger.Infof("Closed connection to %v", w.client)
8995
return
9096
}
9197

9298
if reconnectAttempts > 0 {
93-
logp.Info("Attempting to reconnect to %v with %d reconnect attempt(s)", w.client, reconnectAttempts)
99+
w.logger.Infof("Attempting to reconnect to %v with %d reconnect attempt(s)", w.client, reconnectAttempts)
94100
} else {
95-
logp.Info("Connecting to %v", w.client)
101+
w.logger.Infof("Connecting to %v", w.client)
96102
}
97103

98104
err := w.client.Connect()
99105
if err != nil {
100-
logp.Err("Failed to connect to %v: %v", w.client, err)
106+
w.logger.Errorf("Failed to connect to %v: %v", w.client, err)
101107
reconnectAttempts++
102108
continue
103109
}
104110

105-
logp.Info("Connection to %v established", w.client)
111+
w.logger.Infof("Connection to %v established", w.client)
106112
reconnectAttempts = 0
107113
break
108114
}
@@ -118,7 +124,7 @@ func (w *netClientWorker) run() {
118124

119125
err := w.client.Publish(batch)
120126
if err != nil {
121-
logp.Err("Failed to publish events: %v", err)
127+
w.logger.Errorf("Failed to publish events: %v", err)
122128
// on error return to connect loop
123129
break
124130
}

libbeat/publisher/pipeline/stress/gen.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,11 @@ import (
2424
"sync"
2525
"time"
2626

27+
"github.com/elastic/beats/v7/libbeat/logp"
28+
2729
"github.com/elastic/beats/v7/libbeat/beat"
2830
"github.com/elastic/beats/v7/libbeat/common"
2931
"github.com/elastic/beats/v7/libbeat/common/atomic"
30-
"github.com/elastic/beats/v7/libbeat/logp"
3132
)
3233

3334
type generateConfig struct {
@@ -65,9 +66,10 @@ func generate(
6566
WaitClose: config.WaitClose,
6667
}
6768

69+
logger := logp.NewLogger("publisher_pipeline_stress_generate")
6870
if config.ACK {
6971
settings.ACKCount = func(n int) {
70-
logp.Info("Pipeline client (%v) ACKS; %v", id, n)
72+
logger.Infof("Pipeline client (%v) ACKS; %v", id, n)
7173
}
7274
}
7375

@@ -89,7 +91,7 @@ func generate(
8991
panic(err)
9092
}
9193

92-
defer logp.Info("client (%v) closed: %v", id, time.Now())
94+
defer logger.Infof("client (%v) closed: %v", id, time.Now())
9395

9496
done := make(chan struct{})
9597
defer close(done)
@@ -136,8 +138,8 @@ func generate(
136138
})
137139
}
138140

139-
logp.Info("start (%v) generator: %v", id, time.Now())
140-
defer logp.Info("stop (%v) generator: %v", id, time.Now())
141+
logger.Infof("start (%v) generator: %v", id, time.Now())
142+
defer logger.Infof("stop (%v) generator: %v", id, time.Now())
141143

142144
for cs.Active() {
143145
event := beat.Event{

libbeat/publisher/pipeline/stress/run.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -84,9 +84,9 @@ func RunTests(
8484
return fmt.Errorf("loading pipeline failed: %+v", err)
8585
}
8686
defer func() {
87-
logp.Info("Stop pipeline")
87+
log.Info("Stop pipeline")
8888
pipeline.Close()
89-
logp.Info("pipeline closed")
89+
log.Info("pipeline closed")
9090
}()
9191

9292
cs := newCloseSignaler()
@@ -100,7 +100,7 @@ func RunTests(
100100
withWG(&genWG, func() {
101101
err := generate(cs, pipeline, config.Generate, i, errors)
102102
if err != nil {
103-
logp.Err("Generator failed with: %v", err)
103+
log.Errorf("Generator failed with: %v", err)
104104
}
105105
})
106106
}

libbeat/publisher/processing/processors.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ type processorFn struct {
4141
}
4242

4343
func newGeneralizeProcessor(keepNull bool) *processorFn {
44+
logger := logp.NewLogger("publisher_processing")
4445
return newProcessor("generalizeEvent", func(event *beat.Event) (*beat.Event, error) {
4546
// Filter out empty events. Empty events are still reported by ACK callbacks.
4647
if len(event.Fields) == 0 {
@@ -50,7 +51,7 @@ func newGeneralizeProcessor(keepNull bool) *processorFn {
5051
g := common.NewGenericEventConverter(keepNull)
5152
fields := g.Convert(event.Fields)
5253
if fields == nil {
53-
logp.Err("fail to convert to generic event")
54+
logger.Error("fail to convert to generic event")
5455
return nil, nil
5556
}
5657

libbeat/reader/multiline/multiline.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ type Reader struct {
5252
err error // last seen error
5353
state func(*Reader) (reader.Message, error)
5454
message reader.Message
55+
logger *logp.Logger
5556
}
5657

5758
const (
@@ -125,6 +126,7 @@ func New(
125126
maxLines: maxLines,
126127
separator: []byte(separator),
127128
message: reader.Message{},
129+
logger: logp.NewLogger("reader_multiline"),
128130
}
129131
return mlr, nil
130132
}
@@ -143,7 +145,7 @@ func (mlr *Reader) readFirst() (reader.Message, error) {
143145
continue
144146
}
145147

146-
logp.Debug("multiline", "Multiline event flushed because timeout reached.")
148+
mlr.logger.Debug("Multiline event flushed because timeout reached.")
147149

148150
// pass error to caller (next layer) for handling
149151
return message, err
@@ -172,7 +174,7 @@ func (mlr *Reader) readNext() (reader.Message, error) {
172174
continue
173175
}
174176

175-
logp.Debug("multiline", "Multiline event flushed because timeout reached.")
177+
mlr.logger.Debug("Multiline event flushed because timeout reached.")
176178

177179
// return collected multiline event and
178180
// empty buffer for new multiline event

libbeat/reader/readfile/line.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ type LineReader struct {
4141
inOffset int // input buffer read offset
4242
byteCount int // number of bytes decoded from input buffer into output buffer
4343
decoder transform.Transformer
44+
logger *logp.Logger
4445
}
4546

4647
// New creates a new reader object
@@ -66,6 +67,7 @@ func NewLineReader(input io.Reader, config Config) (*LineReader, error) {
6667
decodedNl: terminator,
6768
inBuffer: streambuf.New(nil),
6869
outBuffer: streambuf.New(nil),
70+
logger: logp.NewLogger("reader_line"),
6971
}, nil
7072
}
7173

@@ -86,15 +88,15 @@ func (r *LineReader) Next() ([]byte, int, error) {
8688

8789
// This can happen if something goes wrong during decoding
8890
if len(buf) == 0 {
89-
logp.Err("Empty buffer returned by advance")
91+
r.logger.Error("Empty buffer returned by advance")
9092
continue
9193
}
9294

9395
if bytes.HasSuffix(buf, r.decodedNl) {
9496
break
9597
} else {
96-
logp.Debug("line", "Line ending char found which wasn't one: %c", buf[len(buf)-1])
97-
logp.Debug("line", "In %s", string(buf))
98+
r.logger.Debugf("Line ending char found which wasn't one: %c", buf[len(buf)-1])
99+
r.logger.Debugf("In %s", string(buf))
98100
}
99101
}
100102

@@ -151,7 +153,7 @@ func (r *LineReader) advance() error {
151153
// -> decode input sequence into outBuffer
152154
sz, err := r.decode(idx + len(r.nl))
153155
if err != nil {
154-
logp.Err("Error decoding line: %s", err)
156+
r.logger.Errorf("Error decoding line: %s", err)
155157
// In case of error increase size by unencoded length
156158
sz = idx + len(r.nl)
157159
}

libbeat/reader/readjson/docker_json.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ type DockerJSONReader struct {
4646
parseLine func(message *reader.Message, msg *logLine) error
4747

4848
stripNewLine func(msg *reader.Message)
49+
50+
logger *logp.Logger
4951
}
5052

5153
type logLine struct {
@@ -64,6 +66,7 @@ func New(r reader.Reader, stream string, partial bool, format string, CRIFlags b
6466
partial: partial,
6567
reader: r,
6668
criflags: CRIFlags,
69+
logger: logp.NewLogger("reader_docker_json"),
6770
}
6871

6972
switch strings.ToLower(format) {
@@ -198,7 +201,7 @@ func (p *DockerJSONReader) Next() (reader.Message, error) {
198201
var logLine logLine
199202
err = p.parseLine(&message, &logLine)
200203
if err != nil {
201-
logp.Err("Parse line error: %v", err)
204+
p.logger.Errorf("Parse line error: %v", err)
202205
return message, reader.ErrLineUnparsable
203206
}
204207

@@ -215,7 +218,7 @@ func (p *DockerJSONReader) Next() (reader.Message, error) {
215218
}
216219
err = p.parseLine(&next, &logLine)
217220
if err != nil {
218-
logp.Err("Parse line error: %v", err)
221+
p.logger.Errorf("Parse line error: %v", err)
219222
return message, reader.ErrLineUnparsable
220223
}
221224
message.Content = append(message.Content, next.Content...)

libbeat/reader/readjson/json.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,16 @@ import (
3434
type JSONReader struct {
3535
reader reader.Reader
3636
cfg *Config
37+
logger *logp.Logger
3738
}
3839

3940
// NewJSONReader creates a new reader that can decode JSON.
4041
func NewJSONReader(r reader.Reader, cfg *Config) *JSONReader {
41-
return &JSONReader{reader: r, cfg: cfg}
42+
return &JSONReader{
43+
reader: r,
44+
cfg: cfg,
45+
logger: logp.NewLogger("reader_json"),
46+
}
4247
}
4348

4449
// decodeJSON unmarshals the text parameter into a MapStr and
@@ -49,7 +54,7 @@ func (r *JSONReader) decode(text []byte) ([]byte, common.MapStr) {
4954
err := unmarshal(text, &jsonFields)
5055
if err != nil || jsonFields == nil {
5156
if !r.cfg.IgnoreDecodingError {
52-
logp.Err("Error decoding JSON: %v", err)
57+
r.logger.Errorf("Error decoding JSON: %v", err)
5358
}
5459
if r.cfg.AddErrorKey {
5560
jsonFields = common.MapStr{"error": createJSONError(fmt.Sprintf("Error decoding JSON: %v", err))}

0 commit comments

Comments
 (0)