Skip to content

Commit ad2672d

Browse files
authored
Remove global logger from libbeat publisher and reader (#16886)
1 parent 3459158 commit ad2672d

File tree

8 files changed

+33
-24
lines changed

8 files changed

+33
-24
lines changed

libbeat/publisher/pipeline/output.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ type netClientWorker struct {
4040

4141
batchSize int
4242
batchSizer func() int
43+
logger *logp.Logger
4344
}
4445

4546
func makeClientWorker(observer outputObserver, qu workQueue, client outputs.Client) outputWorker {
@@ -85,24 +86,24 @@ func (w *netClientWorker) run() {
8586
batch.Cancelled()
8687

8788
if w.closed.Load() {
88-
logp.Info("Closed connection to %v", w.client)
89+
w.logger.Infof("Closed connection to %v", w.client)
8990
return
9091
}
9192

9293
if reconnectAttempts > 0 {
93-
logp.Info("Attempting to reconnect to %v with %d reconnect attempt(s)", w.client, reconnectAttempts)
94+
w.logger.Infof("Attempting to reconnect to %v with %d reconnect attempt(s)", w.client, reconnectAttempts)
9495
} else {
95-
logp.Info("Connecting to %v", w.client)
96+
w.logger.Infof("Connecting to %v", w.client)
9697
}
9798

9899
err := w.client.Connect()
99100
if err != nil {
100-
logp.Err("Failed to connect to %v: %v", w.client, err)
101+
w.logger.Errorf("Failed to connect to %v: %v", w.client, err)
101102
reconnectAttempts++
102103
continue
103104
}
104105

105-
logp.Info("Connection to %v established", w.client)
106+
w.logger.Infof("Connection to %v established", w.client)
106107
reconnectAttempts = 0
107108
break
108109
}
@@ -118,7 +119,7 @@ func (w *netClientWorker) run() {
118119

119120
err := w.client.Publish(batch)
120121
if err != nil {
121-
logp.Err("Failed to publish events: %v", err)
122+
w.logger.Errorf("Failed to publish events: %v", err)
122123
// on error return to connect loop
123124
break
124125
}

libbeat/publisher/pipeline/stress/gen.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,11 @@ import (
2424
"sync"
2525
"time"
2626

27+
"github.com/elastic/beats/v7/libbeat/logp"
28+
2729
"github.com/elastic/beats/v7/libbeat/beat"
2830
"github.com/elastic/beats/v7/libbeat/common"
2931
"github.com/elastic/beats/v7/libbeat/common/atomic"
30-
"github.com/elastic/beats/v7/libbeat/logp"
3132
)
3233

3334
type generateConfig struct {
@@ -65,9 +66,10 @@ func generate(
6566
WaitClose: config.WaitClose,
6667
}
6768

69+
logger := logp.NewLogger("publisher_pipeline_stress_generate")
6870
if config.ACK {
6971
settings.ACKCount = func(n int) {
70-
logp.Info("Pipeline client (%v) ACKS; %v", id, n)
72+
logger.Infof("Pipeline client (%v) ACKS; %v", id, n)
7173
}
7274
}
7375

@@ -89,7 +91,7 @@ func generate(
8991
panic(err)
9092
}
9193

92-
defer logp.Info("client (%v) closed: %v", id, time.Now())
94+
defer logger.Infof("client (%v) closed: %v", id, time.Now())
9395

9496
done := make(chan struct{})
9597
defer close(done)
@@ -136,8 +138,8 @@ func generate(
136138
})
137139
}
138140

139-
logp.Info("start (%v) generator: %v", id, time.Now())
140-
defer logp.Info("stop (%v) generator: %v", id, time.Now())
141+
logger.Infof("start (%v) generator: %v", id, time.Now())
142+
defer logger.Infof("stop (%v) generator: %v", id, time.Now())
141143

142144
for cs.Active() {
143145
event := beat.Event{

libbeat/publisher/pipeline/stress/run.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -84,9 +84,9 @@ func RunTests(
8484
return fmt.Errorf("loading pipeline failed: %+v", err)
8585
}
8686
defer func() {
87-
logp.Info("Stop pipeline")
87+
log.Info("Stop pipeline")
8888
pipeline.Close()
89-
logp.Info("pipeline closed")
89+
log.Info("pipeline closed")
9090
}()
9191

9292
cs := newCloseSignaler()
@@ -100,7 +100,7 @@ func RunTests(
100100
withWG(&genWG, func() {
101101
err := generate(cs, pipeline, config.Generate, i, errors)
102102
if err != nil {
103-
logp.Err("Generator failed with: %v", err)
103+
log.Errorf("Generator failed with: %v", err)
104104
}
105105
})
106106
}

libbeat/publisher/processing/processors.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ type processorFn struct {
4141
}
4242

4343
func newGeneralizeProcessor(keepNull bool) *processorFn {
44+
logger := logp.NewLogger("publisher_processing")
4445
return newProcessor("generalizeEvent", func(event *beat.Event) (*beat.Event, error) {
4546
// Filter out empty events. Empty events are still reported by ACK callbacks.
4647
if len(event.Fields) == 0 {
@@ -50,7 +51,7 @@ func newGeneralizeProcessor(keepNull bool) *processorFn {
5051
g := common.NewGenericEventConverter(keepNull)
5152
fields := g.Convert(event.Fields)
5253
if fields == nil {
53-
logp.Err("fail to convert to generic event")
54+
logger.Error("fail to convert to generic event")
5455
return nil, nil
5556
}
5657

libbeat/reader/multiline/multiline.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ type Reader struct {
5252
err error // last seen error
5353
state func(*Reader) (reader.Message, error)
5454
message reader.Message
55+
logger *logp.Logger
5556
}
5657

5758
const (
@@ -143,7 +144,7 @@ func (mlr *Reader) readFirst() (reader.Message, error) {
143144
continue
144145
}
145146

146-
logp.Debug("multiline", "Multiline event flushed because timeout reached.")
147+
mlr.logger.Debug("Multiline event flushed because timeout reached.")
147148

148149
// pass error to caller (next layer) for handling
149150
return message, err
@@ -172,7 +173,7 @@ func (mlr *Reader) readNext() (reader.Message, error) {
172173
continue
173174
}
174175

175-
logp.Debug("multiline", "Multiline event flushed because timeout reached.")
176+
mlr.logger.Debug("Multiline event flushed because timeout reached.")
176177

177178
// return collected multiline event and
178179
// empty buffer for new multiline event

libbeat/reader/readfile/line.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ type LineReader struct {
4141
inOffset int // input buffer read offset
4242
byteCount int // number of bytes decoded from input buffer into output buffer
4343
decoder transform.Transformer
44+
logger *logp.Logger
4445
}
4546

4647
// New creates a new reader object
@@ -86,15 +87,15 @@ func (r *LineReader) Next() ([]byte, int, error) {
8687

8788
// This can happen if something goes wrong during decoding
8889
if len(buf) == 0 {
89-
logp.Err("Empty buffer returned by advance")
90+
r.logger.Error("Empty buffer returned by advance")
9091
continue
9192
}
9293

9394
if bytes.HasSuffix(buf, r.decodedNl) {
9495
break
9596
} else {
96-
logp.Debug("line", "Line ending char found which wasn't one: %c", buf[len(buf)-1])
97-
logp.Debug("line", "In %s", string(buf))
97+
r.logger.Debugf("Line ending char found which wasn't one: %c", buf[len(buf)-1])
98+
r.logger.Debugf("In %s", string(buf))
9899
}
99100
}
100101

@@ -151,7 +152,7 @@ func (r *LineReader) advance() error {
151152
// -> decode input sequence into outBuffer
152153
sz, err := r.decode(idx + len(r.nl))
153154
if err != nil {
154-
logp.Err("Error decoding line: %s", err)
155+
r.logger.Errorf("Error decoding line: %s", err)
155156
// In case of error increase size by unencoded length
156157
sz = idx + len(r.nl)
157158
}

libbeat/reader/readjson/docker_json.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ type DockerJSONReader struct {
4646
parseLine func(message *reader.Message, msg *logLine) error
4747

4848
stripNewLine func(msg *reader.Message)
49+
50+
logger *logp.Logger
4951
}
5052

5153
type logLine struct {
@@ -198,7 +200,7 @@ func (p *DockerJSONReader) Next() (reader.Message, error) {
198200
var logLine logLine
199201
err = p.parseLine(&message, &logLine)
200202
if err != nil {
201-
logp.Err("Parse line error: %v", err)
203+
p.logger.Errorf("Parse line error: %v", err)
202204
return message, reader.ErrLineUnparsable
203205
}
204206

@@ -215,7 +217,7 @@ func (p *DockerJSONReader) Next() (reader.Message, error) {
215217
}
216218
err = p.parseLine(&next, &logLine)
217219
if err != nil {
218-
logp.Err("Parse line error: %v", err)
220+
p.logger.Errorf("Parse line error: %v", err)
219221
return message, reader.ErrLineUnparsable
220222
}
221223
message.Content = append(message.Content, next.Content...)

libbeat/reader/readjson/json.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
type JSONReader struct {
3535
reader reader.Reader
3636
cfg *Config
37+
logger *logp.Logger
3738
}
3839

3940
// NewJSONReader creates a new reader that can decode JSON.
@@ -49,7 +50,7 @@ func (r *JSONReader) decode(text []byte) ([]byte, common.MapStr) {
4950
err := unmarshal(text, &jsonFields)
5051
if err != nil || jsonFields == nil {
5152
if !r.cfg.IgnoreDecodingError {
52-
logp.Err("Error decoding JSON: %v", err)
53+
r.logger.Errorf("Error decoding JSON: %v", err)
5354
}
5455
if r.cfg.AddErrorKey {
5556
jsonFields = common.MapStr{"error": createJSONError(fmt.Sprintf("Error decoding JSON: %v", err))}

0 commit comments

Comments
 (0)