Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 16 additions & 9 deletions libbeat/processors/add_docker_metadata/add_docker_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,11 @@ const (
cgroupCacheExpiration = 5 * time.Minute
)

// processGroupPaths returns the cgroups associated with a process. This enables
// initCgroupPaths initializes a new cgroup reader. This enables
// unit testing by allowing us to stub the OS interface.
var processCgroupPaths = cgroup.ProcessCgroupPaths
var initCgroupPaths processors.InitCgroupHandler = func(rootfsMountpoint resolve.Resolver, ignoreRootCgroups bool) (processors.CGReader, error) {
return cgroup.NewReader(rootfsMountpoint, ignoreRootCgroups)
}

func init() {
processors.RegisterPlugin(processorName, New)
Expand All @@ -61,11 +63,11 @@ type addDockerMetadata struct {
fields []string
sourceProcessor beat.Processor

pidFields []string // Field names that contain PIDs.
cgroups *common.Cache // Cache of PID (int) to cgropus (map[string]string).
hostFS resolve.Resolver // Directory where /proc is found
dedot bool // If set to true, replace dots in labels with `_`.
dockerAvailable bool // If Docker exists in env, then it is set to true
pidFields []string // Field names that contain PIDs.
cgroups *common.Cache // Cache of PID (int) to cgropus (map[string]string).
dedot bool // If set to true, replace dots in labels with `_`.
dockerAvailable bool // If Docker exists in env, then it is set to true
cgreader processors.CGReader
}

const selector = "add_docker_metadata"
Expand Down Expand Up @@ -110,15 +112,20 @@ func buildDockerMetadataProcessor(log *logp.Logger, cfg *conf.C, watcherConstruc
}
}

reader, err := initCgroupPaths(resolve.NewTestResolver(config.HostFS), false)
if err != nil {
return nil, fmt.Errorf("error creating cgroup reader: %w", err)
}

return &addDockerMetadata{
log: log,
watcher: watcher,
fields: config.Fields,
sourceProcessor: sourceProcessor,
pidFields: config.MatchPIDs,
hostFS: resolve.NewTestResolver(config.HostFS),
dedot: config.DeDot,
dockerAvailable: dockerAvailable,
cgreader: reader,
}, nil
}

Expand Down Expand Up @@ -277,7 +284,7 @@ func (d *addDockerMetadata) getProcessCgroups(pid int) (cgroup.PathList, error)
return cgroups, nil
}

cgroups, err := processCgroupPaths(d.hostFS, pid)
cgroups, err := d.cgreader.ProcessCgroupPaths(pid)
if err != nil {
return cgroups, fmt.Errorf("failed to read cgroups for pid=%v: %w", pid, err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/stretchr/testify/assert"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/processors"
"github.com/elastic/elastic-agent-autodiscover/bus"
"github.com/elastic/elastic-agent-autodiscover/docker"
"github.com/elastic/elastic-agent-libs/config"
Expand All @@ -37,29 +38,35 @@ import (
"github.com/elastic/elastic-agent-system-metrics/metric/system/resolve"
)

type testCGReader struct {
}

func (r testCGReader) ProcessCgroupPaths(pid int) (cgroup.PathList, error) {
switch pid {
case 1000:
return cgroup.PathList{
V1: map[string]cgroup.ControllerPath{
"cpu": {ControllerPath: "/docker/8c147fdfab5a2608fe513d10294bf77cb502a231da9725093a155bd25cd1f14b", IsV2: false},
},
}, nil
case 2000:
return cgroup.PathList{
V1: map[string]cgroup.ControllerPath{
"memory": {ControllerPath: "/user.slice", IsV2: false},
},
}, nil
case 3000:
// Parser error (hopefully this never happens).
return cgroup.PathList{}, fmt.Errorf("cgroup parse failure")
default:
return cgroup.PathList{}, os.ErrNotExist
}
}

func init() {
// Stub out the procfs.
processCgroupPaths = func(_ resolve.Resolver, pid int) (cgroup.PathList, error) {

switch pid {
case 1000:
return cgroup.PathList{
V1: map[string]cgroup.ControllerPath{
"cpu": {ControllerPath: "/docker/8c147fdfab5a2608fe513d10294bf77cb502a231da9725093a155bd25cd1f14b", IsV2: false},
},
}, nil
case 2000:
return cgroup.PathList{
V1: map[string]cgroup.ControllerPath{
"memory": {ControllerPath: "/user.slice", IsV2: false},
},
}, nil
case 3000:
// Parser error (hopefully this never happens).
return cgroup.PathList{}, fmt.Errorf("cgroup parse failure")
default:
return cgroup.PathList{}, os.ErrNotExist
}
initCgroupPaths = func(_ resolve.Resolver, _ bool) (processors.CGReader, error) {
return testCGReader{}, nil
}
}

Expand Down
14 changes: 11 additions & 3 deletions libbeat/processors/add_process_metadata/add_process_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,10 @@ var (

procCache = newProcessCache(cacheExpiration, cacheCapacity, cacheEvictionEffort, gosysinfoProvider{})

processCgroupPaths = cgroup.ProcessCgroupPaths
// cgroups resolver, turned to a stub function to make testing easier.
initCgroupPaths processors.InitCgroupHandler = func(rootfsMountpoint resolve.Resolver, ignoreRootCgroups bool) (processors.CGReader, error) {
return cgroup.NewReader(rootfsMountpoint, ignoreRootCgroups)
}

instanceID atomic.Uint32
)
Expand Down Expand Up @@ -160,6 +163,11 @@ func newProcessMetadataProcessorWithProvider(config config, provider processMeta
}
}

reader, err := initCgroupPaths(resolve.NewTestResolver(config.HostPath), false)
if err != nil {
return nil, fmt.Errorf("error creating cgroup reader: %w", err)
}

// don't use cgroup.ProcessCgroupPaths to save it from doing the work when container id disabled
if ok := containsValue(mappings, "container.id"); ok {
if withCache && config.CgroupCacheExpireTime != 0 {
Expand All @@ -170,9 +178,9 @@ func newProcessMetadataProcessorWithProvider(config config, provider processMeta

p.cgroupsCache = common.NewCacheWithRemovalListener(config.CgroupCacheExpireTime, 100, evictionListener)
p.cgroupsCache.StartJanitor(config.CgroupCacheExpireTime)
p.cidProvider = newCidProvider(resolve.NewTestResolver(config.HostPath), config.CgroupPrefixes, config.CgroupRegex, processCgroupPaths, p.cgroupsCache)
p.cidProvider = newCidProvider(config.CgroupPrefixes, config.CgroupRegex, reader, p.cgroupsCache)
} else {
p.cidProvider = newCidProvider(resolve.NewTestResolver(config.HostPath), config.CgroupPrefixes, config.CgroupRegex, processCgroupPaths, nil)
p.cidProvider = newCidProvider(config.CgroupPrefixes, config.CgroupRegex, reader, nil)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,28 @@ import (

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common/capabilities"
"github.com/elastic/beats/v7/libbeat/processors"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/elastic-agent-system-metrics/metric/system/cgroup"
"github.com/elastic/elastic-agent-system-metrics/metric/system/resolve"
)

type testCGRsolver struct {
res func(pid int) (cgroup.PathList, error)
}

func (t testCGRsolver) ProcessCgroupPaths(pid int) (cgroup.PathList, error) {
return t.res(pid)
}

func newCGHandlerBuilder(handler testCGRsolver) processors.InitCgroupHandler {
return func(_ resolve.Resolver, _ bool) (processors.CGReader, error) {
return handler, nil
}
}

func TestAddProcessMetadata(t *testing.T) {
logp.TestingSetup(logp.WithSelectors(processorName))

Expand Down Expand Up @@ -90,7 +105,7 @@ func TestAddProcessMetadata(t *testing.T) {
}

// mock of the cgroup processCgroupPaths
processCgroupPaths = func(_ resolve.Resolver, pid int) (cgroup.PathList, error) {
processCgroupPaths := func(pid int) (cgroup.PathList, error) {
testMap := map[int]cgroup.PathList{
1: {
V1: map[string]cgroup.ControllerPath{
Expand Down Expand Up @@ -135,6 +150,7 @@ func TestAddProcessMetadata(t *testing.T) {

return testMap[pid], nil
}
initCgroupPaths = newCGHandlerBuilder(testCGRsolver{res: processCgroupPaths})

for _, test := range []struct {
description string
Expand Down Expand Up @@ -884,7 +900,7 @@ func TestUsingCache(t *testing.T) {
selfPID := os.Getpid()

// mock of the cgroup processCgroupPaths
processCgroupPaths = func(_ resolve.Resolver, pid int) (cgroup.PathList, error) {
processCgroupPaths := func(pid int) (cgroup.PathList, error) {
testStruct := cgroup.PathList{
V1: map[string]cgroup.ControllerPath{
"cpu": {ControllerPath: "/kubepods/besteffort/pod665fb997-575b-11ea-bfce-080027421ddf/b5285682fba7449c86452b89a800609440ecc88a7ba5f2d38bedfb85409b30b1"},
Expand All @@ -909,7 +925,7 @@ func TestUsingCache(t *testing.T) {
// testMap :=
return testMap[pid], nil
}

initCgroupPaths = newCGHandlerBuilder(testCGRsolver{res: processCgroupPaths})
config, err := conf.NewConfigFrom(mapstr.M{
"match_pids": []string{"system.process.ppid"},
"include_fields": []string{"container.id", "process.env"},
Expand Down Expand Up @@ -1202,15 +1218,17 @@ func TestPIDToInt(t *testing.T) {
}

func TestV2CID(t *testing.T) {
processCgroupPaths = func(_ resolve.Resolver, _ int) (cgroup.PathList, error) {
processCgroupPaths := func(_ int) (cgroup.PathList, error) {
testMap := cgroup.PathList{
V1: map[string]cgroup.ControllerPath{
"cpu": {IsV2: true, ControllerPath: "system.slice/docker-2dcbab615aebfa9313feffc5cfdacd381543cfa04c6be3f39ac656e55ef34805.scope"},
},
}
return testMap, nil
}
provider := newCidProvider(resolve.NewTestResolver(""), nil, defaultCgroupRegex, processCgroupPaths, nil)
resolver := testCGRsolver{res: processCgroupPaths}
initCgroupPaths = newCGHandlerBuilder(resolver)
provider := newCidProvider(nil, defaultCgroupRegex, resolver, nil)
result, err := provider.GetCid(1)
assert.NoError(t, err)
assert.Equal(t, "2dcbab615aebfa9313feffc5cfdacd381543cfa04c6be3f39ac656e55ef34805", result)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ import (
"strings"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/processors"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-system-metrics/metric/system/cgroup"
"github.com/elastic/elastic-agent-system-metrics/metric/system/resolve"
)

const (
Expand All @@ -37,10 +37,9 @@ const (

type gosigarCidProvider struct {
log *logp.Logger
hostPath resolve.Resolver
cgroupPrefixes []string
cgroupRegex *regexp.Regexp
processCgroupPaths func(resolve.Resolver, int) (cgroup.PathList, error)
processCgroupPaths processors.CGReader
pidCidCache *common.Cache
}

Expand Down Expand Up @@ -70,10 +69,9 @@ func (p gosigarCidProvider) GetCid(pid int) (result string, err error) {
return cid, nil
}

func newCidProvider(hostPath resolve.Resolver, cgroupPrefixes []string, cgroupRegex *regexp.Regexp, processCgroupPaths func(resolve.Resolver, int) (cgroup.PathList, error), pidCidCache *common.Cache) gosigarCidProvider {
func newCidProvider(cgroupPrefixes []string, cgroupRegex *regexp.Regexp, processCgroupPaths processors.CGReader, pidCidCache *common.Cache) gosigarCidProvider {
return gosigarCidProvider{
log: logp.NewLogger(providerName),
hostPath: hostPath,
cgroupPrefixes: cgroupPrefixes,
cgroupRegex: cgroupRegex,
processCgroupPaths: processCgroupPaths,
Expand All @@ -84,7 +82,7 @@ func newCidProvider(hostPath resolve.Resolver, cgroupPrefixes []string, cgroupRe
// getProcessCgroups returns a mapping of cgroup subsystem name to path. It
// returns an error if it failed to retrieve the cgroup info.
func (p gosigarCidProvider) getProcessCgroups(pid int) (cgroup.PathList, error) {
pathList, err := p.processCgroupPaths(p.hostPath, pid)
pathList, err := p.processCgroupPaths.ProcessCgroupPaths(pid)
if err != nil {
var pathError *fs.PathError
if errors.As(err, &pathError) {
Expand Down
32 changes: 32 additions & 0 deletions libbeat/processors/cgroups.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package processors

import (
"github.com/elastic/elastic-agent-system-metrics/metric/system/cgroup"
"github.com/elastic/elastic-agent-system-metrics/metric/system/resolve"
)

// InitCgroupHandler is a type for creating stubs for the cgroup resolver. Used primarily for testing.
type InitCgroupHandler = func(rootfsMountpoint resolve.Resolver, ignoreRootCgroups bool) (CGReader, error)

// CGReader wraps the group Reader.ProcessCgroupPaths() call, this allows us to
// set different cgroups readers for testing.
type CGReader interface {
ProcessCgroupPaths(pid int) (cgroup.PathList, error)
}