Skip to content

Commit 6ed7ae7

Browse files
mergify[bot]kvch
andauthored
Add new parser to filestream input: container (#26115) (#26221)
## What does this PR do? This PR adds support for a new parser named `container`. This is the reader that powers the `container` input behind the scenes. Now it is exposed as a parser. Example configuration for reading container logs with the `filesteam` input: ```yaml type: filestream paths: - /path/to/containers/*/*.log parsers: - container: ~ ``` ### Limitations The PR does not provide feature parity with the `container` input because of the lack of support for separating the states of stdout and strerr streams. It is coming in a follow-up PR. ## Why is it important? It is a step toward supporting reading container logs from every input that supports `parsers` option. (cherry picked from commit e2449af) Co-authored-by: Noémi Ványi <[email protected]>
1 parent 240062b commit 6ed7ae7

File tree

6 files changed

+290
-0
lines changed

6 files changed

+290
-0
lines changed

CHANGELOG.next.asciidoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -578,6 +578,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
578578
- Enhance GCP module to populate orchestrator.* fields for GKE / K8S logs {pull}25368[25368]
579579
- Make `filestream` input GA. {pull}26127[26127]
580580
- http_endpoint: Support multiple documents in a single request by POSTing an array or NDJSON format. {pull}25764[25764]
581+
- Add new `parser` to `filestream` input: `container`. {pull}26115[26115]
581582

582583
*Heartbeat*
583584

filebeat/docs/inputs/input-filestream-reader-options.asciidoc

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,7 @@ Available parsers:
151151

152152
* `multiline`
153153
* `ndjson`
154+
* `container`
154155

155156
In this example, {beatname_uc} is reading multiline messages that consist of 3 lines
156157
and are encapsulated in single-line JSON objects.
@@ -232,3 +233,28 @@ JSON document and stored in `@metadata._id`
232233
*`ignore_decoding_error`*:: An optional configuration setting that specifies if
233234
JSON decoding errors should be logged or not. If set to true, errors will not
234235
be logged. The default is false.
236+
237+
[float]
238+
===== `container`
239+
240+
Use the `container` parser to extract information from containers log files.
241+
It parses lines into common message lines, extracting timestamps too.
242+
243+
*`stream`*:: Reads from the specified streams only: `all`, `stdout` or `stderr`. The default
244+
is `all`.
245+
246+
*`format`*:: Use the given format when parsing logs: `auto`, `docker` or `cri`. The
247+
default is `auto`, it will automatically detect the format. To disable
248+
autodetection set any of the other options.
249+
250+
The following snippet configures {beatname_uc} to read the `stdout` stream from
251+
all containers under the default Kubernetes logs path:
252+
253+
[source,yaml]
254+
----
255+
paths:
256+
- "/var/log/containers/*.log"
257+
parsers:
258+
- container:
259+
stream: stdout
260+
----

filebeat/input/filestream/parser.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,14 @@ func newParsers(in reader.Reader, pCfg parserConfig, c []common.ConfigNamespace)
7070
return nil, fmt.Errorf("error while parsing ndjson parser config: %+v", err)
7171
}
7272
p = readjson.NewJSONParser(p, &config)
73+
case "container":
74+
config := readjson.DefaultContainerConfig()
75+
cfg := ns.Config()
76+
err := cfg.Unpack(&config)
77+
if err != nil {
78+
return nil, fmt.Errorf("error while parsing container parser config: %+v", err)
79+
}
80+
p = readjson.NewContainerParser(p, &config)
7381
default:
7482
return nil, fmt.Errorf("%s: %s", ErrNoSuchParser, name)
7583
}
@@ -96,6 +104,13 @@ func validateParserConfig(pCfg parserConfig, c []common.ConfigNamespace) error {
96104
if err != nil {
97105
return fmt.Errorf("error while parsing ndjson parser config: %+v", err)
98106
}
107+
case "container":
108+
config := readjson.DefaultContainerConfig()
109+
cfg := ns.Config()
110+
err := cfg.Unpack(&config)
111+
if err != nil {
112+
return fmt.Errorf("error while parsing container parser config: %+v", err)
113+
}
99114
default:
100115
return fmt.Errorf("%s: %s", ErrNoSuchParser, name)
101116
}

filebeat/input/filestream/parser_test.go

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,126 @@ func TestJSONParsersWithFields(t *testing.T) {
258258

259259
}
260260

261+
func TestContainerParser(t *testing.T) {
262+
tests := map[string]struct {
263+
lines string
264+
parsers map[string]interface{}
265+
expectedMessages []reader.Message
266+
}{
267+
"simple docker lines": {
268+
lines: `{"log":"Fetching main repository github.com/elastic/beats...\n","stream":"stdout","time":"2016-03-02T22:58:51.338462311Z"}
269+
{"log":"Fetching dependencies...\n","stream":"stdout","time":"2016-03-02T22:59:04.609292428Z"}
270+
{"log":"Execute /scripts/packetbeat_before_build.sh\n","stream":"stdout","time":"2016-03-02T22:59:04.617434682Z"}
271+
{"log":"patching file vendor/github.com/tsg/gopacket/pcap/pcap.go\n","stream":"stdout","time":"2016-03-02T22:59:04.626534779Z"}
272+
`,
273+
parsers: map[string]interface{}{
274+
"paths": []string{"dummy_path"},
275+
"parsers": []map[string]interface{}{
276+
map[string]interface{}{
277+
"container": map[string]interface{}{},
278+
},
279+
},
280+
},
281+
expectedMessages: []reader.Message{
282+
reader.Message{
283+
Content: []byte("Fetching main repository github.com/elastic/beats...\n"),
284+
Fields: common.MapStr{
285+
"stream": "stdout",
286+
},
287+
},
288+
reader.Message{
289+
Content: []byte("Fetching dependencies...\n"),
290+
Fields: common.MapStr{
291+
"stream": "stdout",
292+
},
293+
},
294+
reader.Message{
295+
Content: []byte("Execute /scripts/packetbeat_before_build.sh\n"),
296+
Fields: common.MapStr{
297+
"stream": "stdout",
298+
},
299+
},
300+
reader.Message{
301+
Content: []byte("patching file vendor/github.com/tsg/gopacket/pcap/pcap.go\n"),
302+
Fields: common.MapStr{
303+
"stream": "stdout",
304+
},
305+
},
306+
},
307+
},
308+
"CRI docker lines": {
309+
lines: `2017-09-12T22:32:21.212861448Z stdout F 2017-09-12 22:32:21.212 [INFO][88] table.go 710: Invalidating dataplane cache
310+
`,
311+
parsers: map[string]interface{}{
312+
"paths": []string{"dummy_path"},
313+
"parsers": []map[string]interface{}{
314+
map[string]interface{}{
315+
"container": map[string]interface{}{
316+
"format": "cri",
317+
},
318+
},
319+
},
320+
},
321+
expectedMessages: []reader.Message{
322+
reader.Message{
323+
Content: []byte("2017-09-12 22:32:21.212 [INFO][88] table.go 710: Invalidating dataplane cache\n"),
324+
Fields: common.MapStr{
325+
"stream": "stdout",
326+
},
327+
},
328+
},
329+
},
330+
"corrupt docker lines are skipped": {
331+
lines: `{"log":"Fetching main repository github.com/elastic/beats...\n","stream":"stdout","time":"2016-03-02T22:58:51.338462311Z"}
332+
"log":"Fetching dependencies...\n","stream":"stdout","time":"2016-03-02T22:59:04.609292428Z"}
333+
{"log":"Execute /scripts/packetbeat_before_build.sh\n","stream":"stdout","time":"2016-03-02T22:59:04.617434682Z"}
334+
`,
335+
parsers: map[string]interface{}{
336+
"paths": []string{"dummy_path"},
337+
"parsers": []map[string]interface{}{
338+
map[string]interface{}{
339+
"container": map[string]interface{}{},
340+
},
341+
},
342+
},
343+
expectedMessages: []reader.Message{
344+
reader.Message{
345+
Content: []byte("Fetching main repository github.com/elastic/beats...\n"),
346+
Fields: common.MapStr{
347+
"stream": "stdout",
348+
},
349+
},
350+
reader.Message{
351+
Content: []byte("Execute /scripts/packetbeat_before_build.sh\n"),
352+
Fields: common.MapStr{
353+
"stream": "stdout",
354+
},
355+
},
356+
},
357+
},
358+
}
359+
360+
for name, test := range tests {
361+
test := test
362+
t.Run(name, func(t *testing.T) {
363+
cfg := defaultConfig()
364+
parsersConfig := common.MustNewConfigFrom(test.parsers)
365+
err := parsersConfig.Unpack(&cfg)
366+
require.NoError(t, err)
367+
368+
p, err := newParsers(testReader(test.lines), parserConfig{lineTerminator: readfile.AutoLineTerminator, maxBytes: 1024}, cfg.Reader.Parsers)
369+
370+
i := 0
371+
msg, err := p.Next()
372+
for err == nil {
373+
require.Equal(t, test.expectedMessages[i].Content, msg.Content)
374+
require.Equal(t, test.expectedMessages[i].Fields, msg.Fields)
375+
i++
376+
msg, err = p.Next()
377+
}
378+
})
379+
}
380+
}
261381
func testReader(lines string) reader.Reader {
262382
encF, _ := encoding.FindEncoding("")
263383
reader := strings.NewReader(lines)

libbeat/reader/readjson/docker_json.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,33 @@ func New(r reader.Reader, stream string, partial bool, format string, CRIFlags b
8787
return &reader
8888
}
8989

90+
func NewContainerParser(r reader.Reader, config *ContainerJSONConfig) *DockerJSONReader {
91+
reader := DockerJSONReader{
92+
stream: config.Stream.String(),
93+
partial: true,
94+
reader: r,
95+
criflags: true,
96+
logger: logp.NewLogger("parser_container"),
97+
}
98+
99+
switch config.Format {
100+
case Docker, JSONFile:
101+
reader.parseLine = reader.parseDockerJSONLog
102+
case CRI:
103+
reader.parseLine = reader.parseCRILog
104+
default:
105+
reader.parseLine = reader.parseAuto
106+
}
107+
108+
if runtime.GOOS == "windows" {
109+
reader.stripNewLine = stripNewLineWin
110+
} else {
111+
reader.stripNewLine = stripNewLine
112+
}
113+
114+
return &reader
115+
}
116+
90117
// parseCRILog parses logs in CRI log format.
91118
// CRI log format example :
92119
// 2017-09-12T22:32:21.212861448Z stdout 2017-09-12 22:32:21.212 [INFO][88] table.go 710: Invalidating dataplane cache
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
// Licensed to Elasticsearch B.V. under one or more contributor
2+
// license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright
4+
// ownership. Elasticsearch B.V. licenses this file to you under
5+
// the Apache License, Version 2.0 (the "License"); you may
6+
// not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package readjson
19+
20+
import "fmt"
21+
22+
type ContainerFormat uint8
23+
24+
type Stream uint8
25+
26+
const (
27+
Auto ContainerFormat = iota + 1
28+
CRI
29+
Docker
30+
JSONFile
31+
32+
All Stream = iota + 1
33+
Stdout
34+
Stderr
35+
)
36+
37+
var (
38+
containerFormats = map[string]ContainerFormat{
39+
"auto": Auto,
40+
"cri": CRI,
41+
"docker": Docker,
42+
"json-file": JSONFile,
43+
}
44+
45+
containerStreams = map[string]Stream{
46+
"all": All,
47+
"stdout": Stdout,
48+
"stderr": Stderr,
49+
}
50+
)
51+
52+
type ContainerJSONConfig struct {
53+
Stream Stream `config:"stream"`
54+
Format ContainerFormat `config:"format"`
55+
}
56+
57+
func DefaultContainerConfig() ContainerJSONConfig {
58+
return ContainerJSONConfig{
59+
Format: Auto,
60+
Stream: All,
61+
}
62+
}
63+
64+
func (f *ContainerFormat) Unpack(v string) error {
65+
val, ok := containerFormats[v]
66+
if !ok {
67+
keys := make([]string, len(containerFormats))
68+
i := 0
69+
for k := range containerFormats {
70+
keys[i] = k
71+
i++
72+
}
73+
return fmt.Errorf("unknown container log format: %s, supported values: %+v", v, keys)
74+
}
75+
*f = val
76+
return nil
77+
}
78+
79+
func (s *Stream) Unpack(v string) error {
80+
val, ok := containerStreams[v]
81+
if !ok {
82+
keys := make([]string, len(containerStreams))
83+
i := 0
84+
for k := range containerStreams {
85+
keys[i] = k
86+
i++
87+
}
88+
return fmt.Errorf("unknown streams: %s, supported values: %+v", v, keys)
89+
}
90+
*s = val
91+
return nil
92+
}
93+
94+
func (s *Stream) String() string {
95+
for k, v := range containerStreams {
96+
if v == *s {
97+
return k
98+
}
99+
}
100+
return ""
101+
}

0 commit comments

Comments
 (0)