From b9de5c62baebeb159f1b8cc6b289a5b6fc2fcaa0 Mon Sep 17 00:00:00 2001 From: Christoph Wurm Date: Wed, 19 Sep 2018 16:28:27 +0100 Subject: [PATCH 01/21] Add host, packages, and processes metricsets Implements simple, initial versions of what a host, packages, and processes metricsets could look like. --- x-pack/auditbeat/include/list.go | 2 + x-pack/auditbeat/module/system/host/host.go | 43 ++- .../system/packages/_meta/docs.asciidoc | 8 + .../module/system/packages/_meta/fields.yml | 6 + .../module/system/packages/config.go | 17 + .../module/system/packages/packages.go | 292 ++++++++++++++++++ .../system/processes/_meta/docs.asciidoc | 8 + .../module/system/processes/_meta/fields.yml | 6 + .../module/system/processes/config.go | 17 + .../module/system/processes/processes.go | 89 ++++++ 10 files changed, 482 insertions(+), 6 deletions(-) create mode 100644 x-pack/auditbeat/module/system/packages/_meta/docs.asciidoc create mode 100644 x-pack/auditbeat/module/system/packages/_meta/fields.yml create mode 100644 x-pack/auditbeat/module/system/packages/config.go create mode 100644 x-pack/auditbeat/module/system/packages/packages.go create mode 100644 x-pack/auditbeat/module/system/processes/_meta/docs.asciidoc create mode 100644 x-pack/auditbeat/module/system/processes/_meta/fields.yml create mode 100644 x-pack/auditbeat/module/system/processes/config.go create mode 100644 x-pack/auditbeat/module/system/processes/processes.go diff --git a/x-pack/auditbeat/include/list.go b/x-pack/auditbeat/include/list.go index 1820d2ce9a0b..a75d8d536c3f 100644 --- a/x-pack/auditbeat/include/list.go +++ b/x-pack/auditbeat/include/list.go @@ -8,4 +8,6 @@ import ( // Include all Auditbeat modules so that they register their // factories with the global registry. _ "github.com/elastic/beats/x-pack/auditbeat/module/system/host" + _ "github.com/elastic/beats/x-pack/auditbeat/module/system/packages" + _ "github.com/elastic/beats/x-pack/auditbeat/module/system/processes" ) diff --git a/x-pack/auditbeat/module/system/host/host.go b/x-pack/auditbeat/module/system/host/host.go index 1b630a7e4ea5..f982aea45b52 100644 --- a/x-pack/auditbeat/module/system/host/host.go +++ b/x-pack/auditbeat/module/system/host/host.go @@ -10,6 +10,9 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/cfgwarn" "github.com/elastic/beats/metricbeat/mb" + + "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/go-sysinfo" ) const ( @@ -26,6 +29,7 @@ func init() { // MetricSet collects data about the host. type MetricSet struct { mb.BaseMetricSet + log *logp.Logger } // New constructs a new MetricSet. @@ -37,14 +41,41 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { return nil, errors.Wrapf(err, "failed to unpack the %v/%v config", moduleName, metricsetName) } - return &MetricSet{base}, nil + return &MetricSet{ + BaseMetricSet: base, + log: logp.NewLogger(moduleName), + }, nil } // Fetch collects data about the host. It is invoked periodically. func (ms *MetricSet) Fetch(report mb.ReporterV2) { - report.Event(mb.Event{ - RootFields: common.MapStr{ - "hello": "world", - }, - }) + if host, err := sysinfo.Host(); err == nil { + report.Event(mb.Event{ + MetricSetFields: common.MapStr{ + // https://github.com/elastic/ecs#-host-fields + "uptime": host.Info().Uptime(), + "boottime": host.Info().BootTime, + "containerized": host.Info().Containerized, + "timezone": host.Info().Timezone, + "timezone.offset.sec": host.Info().TimezoneOffsetSec, + "name": host.Info().Hostname, + "id": host.Info().UniqueID, + "ip": host.Info().IPs, + "mac": host.Info().MACs, + // TODO "host.type": ? + "architecture": host.Info().Architecture, + + // https://github.com/elastic/ecs#-operating-system-fields + "os": common.MapStr{ + "platform": host.Info().OS.Platform, + "name": host.Info().OS.Name, + "family": host.Info().OS.Family, + "version": host.Info().OS.Version, + "kernel": host.Info().KernelVersion, + }, + }, + }) + } else { + ms.log.Errorw("Failed to load host information", "error", err) + } } diff --git a/x-pack/auditbeat/module/system/packages/_meta/docs.asciidoc b/x-pack/auditbeat/module/system/packages/_meta/docs.asciidoc new file mode 100644 index 000000000000..a2f70162151f --- /dev/null +++ b/x-pack/auditbeat/module/system/packages/_meta/docs.asciidoc @@ -0,0 +1,8 @@ +The System `packages` metricset provides ... TODO. + +The module is implemented for Linux, macOS (Darwin), and Windows. + +[float] +=== Configuration options + +TODO diff --git a/x-pack/auditbeat/module/system/packages/_meta/fields.yml b/x-pack/auditbeat/module/system/packages/_meta/fields.yml new file mode 100644 index 000000000000..baaef240d12c --- /dev/null +++ b/x-pack/auditbeat/module/system/packages/_meta/fields.yml @@ -0,0 +1,6 @@ +- name: packages + type: group + description: > + `packages` contains TODO. + release: experimental + fields: diff --git a/x-pack/auditbeat/module/system/packages/config.go b/x-pack/auditbeat/module/system/packages/config.go new file mode 100644 index 000000000000..9f532039447b --- /dev/null +++ b/x-pack/auditbeat/module/system/packages/config.go @@ -0,0 +1,17 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package packages + +// Config defines the host metricset's configuration options. +type Config struct { + // TODO: Add config options. +} + +// Validate validates the host metricset config. +func (c *Config) Validate() error { + return nil +} + +var defaultConfig = Config{} diff --git a/x-pack/auditbeat/module/system/packages/packages.go b/x-pack/auditbeat/module/system/packages/packages.go new file mode 100644 index 000000000000..7289897712f7 --- /dev/null +++ b/x-pack/auditbeat/module/system/packages/packages.go @@ -0,0 +1,292 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package packages + +import ( + "bufio" + "fmt" + "io/ioutil" + "os" + "os/exec" + "path" + "strconv" + "strings" + "time" + + "github.com/pkg/errors" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/cfgwarn" + "github.com/elastic/beats/metricbeat/mb" + + "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/go-sysinfo" +) + +const ( + moduleName = "system" + metricsetName = "packages" +) + +func init() { + mb.Registry.MustAddMetricSet(moduleName, metricsetName, New, + mb.DefaultMetricSet(), + ) +} + +// MetricSet collects data about the host. +type MetricSet struct { + mb.BaseMetricSet + log *logp.Logger +} + +// New constructs a new MetricSet. +func New(base mb.BaseMetricSet) (mb.MetricSet, error) { + cfgwarn.Experimental("The %v/%v dataset is experimental", moduleName, metricsetName) + + config := defaultConfig + if err := base.Module().UnpackConfig(&config); err != nil { + return nil, errors.Wrapf(err, "failed to unpack the %v/%v config", moduleName, metricsetName) + } + + return &MetricSet{ + BaseMetricSet: base, + log: logp.NewLogger(moduleName), + }, nil +} + +// Fetch collects data about the host. It is invoked periodically. +func (ms *MetricSet) Fetch(report mb.ReporterV2) { + host, err := sysinfo.Host() + if err != nil { + ms.log.Errorw("Error getting the OS", "error", err) + } + + hostInfo := host.Info() + if hostInfo.OS == nil { + ms.log.Errorw("No OS info from sysinfo.Host", "error", err) + } + + var packages []Package + switch hostInfo.OS.Family { + case "redhat": + packages, err = listRPMPackages() + if err != nil { + ms.log.Errorw("Error getting RPM packages", "error", err) + } + case "debian": + packages, err = listDebPackages() + if err != nil { + ms.log.Errorw("Error getting DEB packages", "error", err) + } + case "darwin": + packages, err = listBrewPackages() + if err != nil { + ms.log.Errorw("Error getting Homebrew packages", "error", err) + } + default: + ms.log.Errorw("No logic for getting packages for OS family", "os", hostInfo.OS.Family) + } + + var pkgInfos []common.MapStr + + for _, pkg := range packages { + pkgInfos = append(pkgInfos, common.MapStr{ + "package.name": pkg.Name, + "package.version": pkg.Version, + "package.release": pkg.Release, + "package.arch": pkg.Arch, + "package.license": pkg.License, + "package.installtime": pkg.InstallTime, + "package.size": pkg.Size, + "package.summary": pkg.Summary, + "package.url": pkg.URL, + }) + } + + report.Event(mb.Event{ + MetricSetFields: common.MapStr{ + "packages": pkgInfos, + }, + }) +} + +type Package struct { + Name string + Version string + Release string + Arch string + License string + InstallTime time.Time + Size uint64 + Summary string + URL string +} + +/* +The following functions copied from https://github.com/tsg/listpackages/blob/master/main.go +*/ +func listRPMPackages() ([]Package, error) { + format := "%{NAME}|%{VERSION}|%{RELEASE}|%{ARCH}|%{LICENSE}|%{INSTALLTIME}|%{SIZE}|%{URL}|%{SUMMARY}\\n" + out, err := exec.Command("/usr/bin/rpm", "--qf", format, "-qa").Output() + if err != nil { + return nil, fmt.Errorf("Error running rpm -qa command: %v", err) + } + + lines := strings.Split(string(out), "\n") + packages := []Package{} + for _, line := range lines { + if len(strings.TrimSpace(line)) == 0 { + continue + } + words := strings.SplitN(line, "|", 9) + if len(words) < 9 { + return nil, fmt.Errorf("Line '%s' doesn't have enough elements", line) + } + pkg := Package{ + Name: words[0], + Version: words[1], + Release: words[2], + Arch: words[3], + License: words[4], + // install time - 5 + // size - 6 + URL: words[7], + Summary: words[8], + } + ts, err := strconv.ParseInt(words[5], 10, 64) + if err != nil { + return nil, fmt.Errorf("Error converting %s to string: %v", words[5], err) + } + pkg.InstallTime = time.Unix(ts, 0) + + pkg.Size, err = strconv.ParseUint(words[6], 10, 64) + if err != nil { + return nil, fmt.Errorf("Error converting %s to string: %v", words[6], err) + } + + packages = append(packages, pkg) + + } + + return packages, nil +} + +func listDebPackages() ([]Package, error) { + statusFile := "/var/lib/dpkg/status" + file, err := os.Open(statusFile) + if err != nil { + return nil, fmt.Errorf("Error opening '%s': %v", statusFile, err) + } + defer file.Close() + + packages := []Package{} + pkg := &Package{} + scanner := bufio.NewScanner(file) + for scanner.Scan() { + line := scanner.Text() + if len(strings.TrimSpace(line)) == 0 { + // empty line signals new package + packages = append(packages, *pkg) + pkg = &Package{} + continue + } + if strings.HasPrefix(line, " ") { + // not interested in multi-lines for now + continue + } + words := strings.SplitN(line, ":", 2) + if len(words) != 2 { + return nil, fmt.Errorf("The following line was unexpected (no ':' found): '%s'", line) + } + value := strings.TrimSpace(words[1]) + switch strings.ToLower(words[0]) { + case "package": + pkg.Name = value + case "architecture": + pkg.Arch = value + case "version": + pkg.Version = value + case "description": + pkg.Summary = value + case "installed-size": + pkg.Size, err = strconv.ParseUint(value, 10, 64) + if err != nil { + return nil, fmt.Errorf("Error converting %s to int: %v", value, err) + } + default: + continue + } + } + if err = scanner.Err(); err != nil { + return nil, fmt.Errorf("Error scanning file: %v", err) + } + return packages, nil +} + +func listBrewPackages() ([]Package, error) { + cellarPath := "/usr/local/Cellar" + + cellarInfo, err := os.Stat(cellarPath) + if err != nil { + return nil, fmt.Errorf("Homebrew cellar not found in %s: %v", cellarPath, err) + } + if !cellarInfo.IsDir() { + return nil, fmt.Errorf("%s is not a directory", cellarPath) + } + + packageDirs, err := ioutil.ReadDir(cellarPath) + if err != nil { + return nil, fmt.Errorf("Error reading directory %s: %v", cellarPath, err) + } + + packages := []Package{} + for _, packageDir := range packageDirs { + if !packageDir.IsDir() { + continue + } + pkgPath := path.Join(cellarPath, packageDir.Name()) + versions, err := ioutil.ReadDir(pkgPath) + if err != nil { + return nil, fmt.Errorf("Error reading directory: %s: %v", pkgPath, err) + } + for _, version := range versions { + if !version.IsDir() { + continue + } + pkg := Package{ + Name: packageDir.Name(), + Version: version.Name(), + InstallTime: version.ModTime(), + } + + // read formula + formulaPath := path.Join(cellarPath, pkg.Name, pkg.Version, ".brew", pkg.Name+".rb") + file, err := os.Open(formulaPath) + if err != nil { + //fmt.Printf("WARNING: Can't get formula for package %s-%s\n", pkg.Name, pkg.Version) + // TODO: follow the path from INSTALL_RECEIPT.json to find the formula + continue + } + scanner := bufio.NewScanner(file) + count := 15 // only look into the first few lines of the formula + for scanner.Scan() { + count -= 1 + if count == 0 { + break + } + line := scanner.Text() + if strings.HasPrefix(line, " desc ") { + pkg.Summary = strings.Trim(line[7:], " \"") + } else if strings.HasPrefix(line, " homepage ") { + pkg.URL = strings.Trim(line[11:], " \"") + } + } + + packages = append(packages, pkg) + } + } + return packages, nil +} diff --git a/x-pack/auditbeat/module/system/processes/_meta/docs.asciidoc b/x-pack/auditbeat/module/system/processes/_meta/docs.asciidoc new file mode 100644 index 000000000000..83d4a74159f2 --- /dev/null +++ b/x-pack/auditbeat/module/system/processes/_meta/docs.asciidoc @@ -0,0 +1,8 @@ +The System `processes` metricset provides ... TODO. + +The module is implemented for Linux, macOS (Darwin), and Windows. + +[float] +=== Configuration options + +TODO diff --git a/x-pack/auditbeat/module/system/processes/_meta/fields.yml b/x-pack/auditbeat/module/system/processes/_meta/fields.yml new file mode 100644 index 000000000000..d55488828d71 --- /dev/null +++ b/x-pack/auditbeat/module/system/processes/_meta/fields.yml @@ -0,0 +1,6 @@ +- name: processes + type: group + description: > + `processes` contains TODO. + release: experimental + fields: diff --git a/x-pack/auditbeat/module/system/processes/config.go b/x-pack/auditbeat/module/system/processes/config.go new file mode 100644 index 000000000000..2bea612a44e8 --- /dev/null +++ b/x-pack/auditbeat/module/system/processes/config.go @@ -0,0 +1,17 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package processes + +// Config defines the host metricset's configuration options. +type Config struct { + // TODO: Add config options. +} + +// Validate validates the host metricset config. +func (c *Config) Validate() error { + return nil +} + +var defaultConfig = Config{} diff --git a/x-pack/auditbeat/module/system/processes/processes.go b/x-pack/auditbeat/module/system/processes/processes.go new file mode 100644 index 000000000000..9934f33a3429 --- /dev/null +++ b/x-pack/auditbeat/module/system/processes/processes.go @@ -0,0 +1,89 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package processes + +import ( + "github.com/pkg/errors" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/cfgwarn" + "github.com/elastic/beats/metricbeat/mb" + + "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/libbeat/metric/system/process" + "github.com/elastic/go-sysinfo" +) + +const ( + moduleName = "system" + metricsetName = "processes" +) + +func init() { + mb.Registry.MustAddMetricSet(moduleName, metricsetName, New, + mb.DefaultMetricSet(), + ) +} + +// MetricSet collects data about the host. +type MetricSet struct { + mb.BaseMetricSet + log *logp.Logger +} + +// New constructs a new MetricSet. +func New(base mb.BaseMetricSet) (mb.MetricSet, error) { + cfgwarn.Experimental("The %v/%v dataset is experimental", moduleName, metricsetName) + + config := defaultConfig + if err := base.Module().UnpackConfig(&config); err != nil { + return nil, errors.Wrapf(err, "failed to unpack the %v/%v config", moduleName, metricsetName) + } + + return &MetricSet{ + BaseMetricSet: base, + log: logp.NewLogger(moduleName), + }, nil +} + +// Fetch collects data about the host. It is invoked periodically. +func (ms *MetricSet) Fetch(report mb.ReporterV2) { + // TODO: Implement Processes() in go-sysinfo + // e.g. https://github.com/elastic/go-sysinfo/blob/master/providers/darwin/process_darwin_amd64.go#L41 + pids, err := process.Pids() + if err != nil { + ms.log.Errorw("Failed to fetch the list of PIDs", "error", err) + } + + var processInfos []common.MapStr + + for _, pid := range pids { + if p, err := sysinfo.Process(pid); err == nil { + if pInfo, err := p.Info(); err == nil { + processInfos = append(processInfos, common.MapStr{ + // https://github.com/elastic/ecs#-process-fields + "process.args": pInfo.Args, + "process.name": pInfo.Name, + "process.pid": pInfo.PID, + "process.ppid": pInfo.PPID, + + "process.cwd": pInfo.CWD, + "process.exe": pInfo.Exe, + "process.starttime": pInfo.StartTime, + }) + } else { + ms.log.Errorw("Failed to load process information", "error", err) + } + } else { + ms.log.Errorw("Failed to load process", "error", err) + } + } + + report.Event(mb.Event{ + MetricSetFields: common.MapStr{ + "processes": processInfos, + }, + }) +} From 687f54b09c26c2087b1f5ed676869050e6088cb9 Mon Sep 17 00:00:00 2001 From: Christoph Wurm Date: Thu, 20 Sep 2018 11:05:41 +0100 Subject: [PATCH 02/21] Add diffing to processes metricset --- .../module/system/processes/processes.go | 130 +++++++++++++++--- 1 file changed, 112 insertions(+), 18 deletions(-) diff --git a/x-pack/auditbeat/module/system/processes/processes.go b/x-pack/auditbeat/module/system/processes/processes.go index 9934f33a3429..394b33319edb 100644 --- a/x-pack/auditbeat/module/system/processes/processes.go +++ b/x-pack/auditbeat/module/system/processes/processes.go @@ -5,11 +5,14 @@ package processes import ( + "strconv" + "github.com/pkg/errors" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/cfgwarn" "github.com/elastic/beats/metricbeat/mb" + "github.com/elastic/go-sysinfo/types" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/metric/system/process" @@ -30,7 +33,8 @@ func init() { // MetricSet collects data about the host. type MetricSet struct { mb.BaseMetricSet - log *logp.Logger + cache map[string](*types.ProcessInfo) + log *logp.Logger } // New constructs a new MetricSet. @@ -48,8 +52,112 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { }, nil } +// fastHash returns a hash calculated using FNV-1 of the PID and StartTime. +// Based on https://github.com/elastic/gosigar/blob/master/sys/linux/inetdiag.go#L362 +// TODO: Move to go-sysinfo +// Actually, might not need this after all. Delete? +/*func fastHashProcessInfo(pInfo *types.ProcessInfo) uint64 { + h := fnv.New64() + h.Write([]byte(strconv.Itoa(pInfo.PID))) + h.Write([]byte(pInfo.StartTime.String())) + return h.Sum64() +}*/ + +func processInfoNaiveHash(pInfo *types.ProcessInfo) string { + // Could use real hash e.g. FNV if there is an advantage + return strconv.Itoa(pInfo.PID) + pInfo.StartTime.String() +} + +func (ms *MetricSet) diffCache(current []*types.ProcessInfo) (new, missing []*types.ProcessInfo) { + // Check for new - what is in current but not in cache + for _, pInfo := range current { + if _, inCache := ms.cache[processInfoNaiveHash(pInfo)]; !inCache { + new = append(new, pInfo) + } + } + + // Check for missing - what is no longer in current that was in the cache + for cachedPInfoKey, cachedPInfo := range ms.cache { + found := false + for _, currentPInfo := range current { + if processInfoNaiveHash(currentPInfo) == cachedPInfoKey { + found = true + break + } + } + + if !found { + missing = append(missing, cachedPInfo) + } + } + + return +} + +func processInfoToMapStr(pInfo *types.ProcessInfo) common.MapStr { + return common.MapStr{ + // https://github.com/elastic/ecs#-process-fields + "process.args": pInfo.Args, + "process.name": pInfo.Name, + "process.pid": pInfo.PID, + "process.ppid": pInfo.PPID, + + "process.cwd": pInfo.CWD, + "process.exe": pInfo.Exe, + "process.starttime": pInfo.StartTime, + } +} + // Fetch collects data about the host. It is invoked periodically. func (ms *MetricSet) Fetch(report mb.ReporterV2) { + processInfos := ms.getProcessInfos() + + diff := true + if ms.cache != nil && diff { + // find out which processes were stopped or started, if any + started, stopped := ms.diffCache(processInfos) + + for _, pInfo := range started { + report.Event(mb.Event{ + MetricSetFields: common.MapStr{ + "status": "started", + "process": processInfoToMapStr(pInfo), + }, + }) + } + + for _, pInfo := range stopped { + report.Event(mb.Event{ + MetricSetFields: common.MapStr{ + "status": "stopped", + "process": processInfoToMapStr(pInfo), + }, + }) + } + } else { + var processEvents []common.MapStr + + for _, pInfo := range processInfos { + processEvents = append(processEvents, processInfoToMapStr(pInfo)) + } + + report.Event(mb.Event{ + MetricSetFields: common.MapStr{ + "processes": processEvents, + }, + }) + } + + if diff { + // Refill cache + ms.cache = make(map[string](*types.ProcessInfo)) + for _, pInfo := range processInfos { + ms.cache[processInfoNaiveHash(pInfo)] = pInfo + } + } +} + +func (ms *MetricSet) getProcessInfos() []*types.ProcessInfo { // TODO: Implement Processes() in go-sysinfo // e.g. https://github.com/elastic/go-sysinfo/blob/master/providers/darwin/process_darwin_amd64.go#L41 pids, err := process.Pids() @@ -57,22 +165,12 @@ func (ms *MetricSet) Fetch(report mb.ReporterV2) { ms.log.Errorw("Failed to fetch the list of PIDs", "error", err) } - var processInfos []common.MapStr + var processInfos []*types.ProcessInfo for _, pid := range pids { if p, err := sysinfo.Process(pid); err == nil { if pInfo, err := p.Info(); err == nil { - processInfos = append(processInfos, common.MapStr{ - // https://github.com/elastic/ecs#-process-fields - "process.args": pInfo.Args, - "process.name": pInfo.Name, - "process.pid": pInfo.PID, - "process.ppid": pInfo.PPID, - - "process.cwd": pInfo.CWD, - "process.exe": pInfo.Exe, - "process.starttime": pInfo.StartTime, - }) + processInfos = append(processInfos, &pInfo) } else { ms.log.Errorw("Failed to load process information", "error", err) } @@ -81,9 +179,5 @@ func (ms *MetricSet) Fetch(report mb.ReporterV2) { } } - report.Event(mb.Event{ - MetricSetFields: common.MapStr{ - "processes": processInfos, - }, - }) + return processInfos } From 0b89b90c9e7853d58589b6f17cbba51f9224b302 Mon Sep 17 00:00:00 2001 From: Christoph Wurm Date: Thu, 20 Sep 2018 17:01:25 +0100 Subject: [PATCH 03/21] Add diffing to packages metricset --- x-pack/auditbeat/cache/cache.go | 58 +++++++ .../module/system/{host => config}/config.go | 11 +- x-pack/auditbeat/module/system/host/host.go | 3 +- .../module/system/packages/config.go | 17 -- .../module/system/packages/packages.go | 155 +++++++++++++----- .../module/system/processes/config.go | 17 -- .../module/system/processes/processes.go | 123 ++++++-------- 7 files changed, 227 insertions(+), 157 deletions(-) create mode 100644 x-pack/auditbeat/cache/cache.go rename x-pack/auditbeat/module/system/{host => config}/config.go (67%) delete mode 100644 x-pack/auditbeat/module/system/packages/config.go delete mode 100644 x-pack/auditbeat/module/system/processes/config.go diff --git a/x-pack/auditbeat/cache/cache.go b/x-pack/auditbeat/cache/cache.go new file mode 100644 index 000000000000..b63fcdface4d --- /dev/null +++ b/x-pack/auditbeat/cache/cache.go @@ -0,0 +1,58 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package cache + +// Cache is just a map being used as a cache. +type Cache struct { + hashMap map[string]Cacheable +} + +// Cacheable is the interface items stored in Cache need to implement. +type Cacheable interface { + Hash() string +} + +// New creates a new cache. +func New() *Cache { + return &Cache{ + hashMap: make(map[string]Cacheable), + } +} + +// IsEmpty checks if the cache is empty. +func (cache *Cache) IsEmpty() bool { + return len(cache.hashMap) == 0 +} + +// DiffAndUpdateCache takes a list of new items to cache, compares them to the current +// cache contents, and returns both items new to the cache and items that are in the cache +// but missing in the new data. +func (cache *Cache) DiffAndUpdateCache(current []Cacheable) (new, missing []interface{}) { + // Check for and delete missing - what is no longer in current that was in the cache + for cacheKey, cacheValue := range cache.hashMap { + found := false + for _, currentValue := range current { + if currentValue.Hash() == cacheKey { + found = true + break + } + } + + if !found { + missing = append(missing, cacheValue) + delete(cache.hashMap, cacheKey) + } + } + + // Check for new - what is in current but not in cache + for _, currentValue := range current { + if _, contains := cache.hashMap[currentValue.Hash()]; !contains { + new = append(new, currentValue) + cache.hashMap[currentValue.Hash()] = currentValue + } + } + + return +} diff --git a/x-pack/auditbeat/module/system/host/config.go b/x-pack/auditbeat/module/system/config/config.go similarity index 67% rename from x-pack/auditbeat/module/system/host/config.go rename to x-pack/auditbeat/module/system/config/config.go index 3cfaf464aff3..7a120607c0df 100644 --- a/x-pack/auditbeat/module/system/host/config.go +++ b/x-pack/auditbeat/module/system/config/config.go @@ -2,11 +2,11 @@ // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. -package host +package config // Config defines the host metricset's configuration options. type Config struct { - // TODO: Add config options. + ReportChanges bool `config:"report_changes"` } // Validate validates the host metricset config. @@ -14,4 +14,9 @@ func (c *Config) Validate() error { return nil } -var defaultConfig = Config{} +// NewDefaultConfig returns a default configuration for this module. +func NewDefaultConfig() Config { + return Config{ + ReportChanges: true, + } +} diff --git a/x-pack/auditbeat/module/system/host/host.go b/x-pack/auditbeat/module/system/host/host.go index f982aea45b52..0b197d716222 100644 --- a/x-pack/auditbeat/module/system/host/host.go +++ b/x-pack/auditbeat/module/system/host/host.go @@ -10,6 +10,7 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/cfgwarn" "github.com/elastic/beats/metricbeat/mb" + "github.com/elastic/beats/x-pack/auditbeat/module/system/config" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/go-sysinfo" @@ -36,7 +37,7 @@ type MetricSet struct { func New(base mb.BaseMetricSet) (mb.MetricSet, error) { cfgwarn.Experimental("The %v/%v dataset is experimental", moduleName, metricsetName) - config := defaultConfig + config := config.NewDefaultConfig() if err := base.Module().UnpackConfig(&config); err != nil { return nil, errors.Wrapf(err, "failed to unpack the %v/%v config", moduleName, metricsetName) } diff --git a/x-pack/auditbeat/module/system/packages/config.go b/x-pack/auditbeat/module/system/packages/config.go deleted file mode 100644 index 9f532039447b..000000000000 --- a/x-pack/auditbeat/module/system/packages/config.go +++ /dev/null @@ -1,17 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package packages - -// Config defines the host metricset's configuration options. -type Config struct { - // TODO: Add config options. -} - -// Validate validates the host metricset config. -func (c *Config) Validate() error { - return nil -} - -var defaultConfig = Config{} diff --git a/x-pack/auditbeat/module/system/packages/packages.go b/x-pack/auditbeat/module/system/packages/packages.go index 7289897712f7..b2c451bcab12 100644 --- a/x-pack/auditbeat/module/system/packages/packages.go +++ b/x-pack/auditbeat/module/system/packages/packages.go @@ -20,6 +20,8 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/cfgwarn" "github.com/elastic/beats/metricbeat/mb" + "github.com/elastic/beats/x-pack/auditbeat/cache" + "github.com/elastic/beats/x-pack/auditbeat/module/system/config" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/go-sysinfo" @@ -39,26 +41,124 @@ func init() { // MetricSet collects data about the host. type MetricSet struct { mb.BaseMetricSet - log *logp.Logger + config config.Config + cache *cache.Cache + log *logp.Logger +} + +// Package represents information for a package. +type Package struct { + Name string + Version string + Release string + Arch string + License string + InstallTime time.Time + Size uint64 + Summary string + URL string +} + +// Hash creates a hash for Package. +func (pkg Package) Hash() string { + // Could use real hash e.g. FNV if there is an advantage + return pkg.Name + pkg.InstallTime.String() +} + +func (pkg Package) toMapStr() common.MapStr { + return common.MapStr{ + "package.name": pkg.Name, + "package.version": pkg.Version, + "package.release": pkg.Release, + "package.arch": pkg.Arch, + "package.license": pkg.License, + "package.installtime": pkg.InstallTime, + "package.size": pkg.Size, + "package.summary": pkg.Summary, + "package.url": pkg.URL, + } } // New constructs a new MetricSet. func New(base mb.BaseMetricSet) (mb.MetricSet, error) { cfgwarn.Experimental("The %v/%v dataset is experimental", moduleName, metricsetName) - config := defaultConfig + config := config.NewDefaultConfig() if err := base.Module().UnpackConfig(&config); err != nil { return nil, errors.Wrapf(err, "failed to unpack the %v/%v config", moduleName, metricsetName) } - return &MetricSet{ + ms := &MetricSet{ BaseMetricSet: base, + config: config, log: logp.NewLogger(moduleName), - }, nil + } + + if config.ReportChanges { + ms.cache = cache.New() + } + + return ms, nil } // Fetch collects data about the host. It is invoked periodically. func (ms *MetricSet) Fetch(report mb.ReporterV2) { + packages := ms.getPackages() + + /*var pkgInfos []common.MapStr + + for _, pkg := range packages { + pkgInfos = append(pkgInfos, pkg.toMapStr()) + } + + report.Event(mb.Event{ + MetricSetFields: common.MapStr{ + "packages": pkgInfos, + }, + })*/ + + if ms.cache != nil && !ms.cache.IsEmpty() { + installed, removed := ms.cache.DiffAndUpdateCache(packages) + + for _, pkgInfo := range installed { + report.Event(mb.Event{ + MetricSetFields: common.MapStr{ + "status": "installed", + "packages": pkgInfo.(Package).toMapStr(), + }, + }) + } + + for _, pkgInfo := range removed { + report.Event(mb.Event{ + MetricSetFields: common.MapStr{ + "status": "removed", + "packages": pkgInfo.(Package).toMapStr(), + }, + }) + } + } else { + // Report all installed packages + var pkgInfos []common.MapStr + + for _, pkgInfo := range packages { + pkgInfos = append(pkgInfos, pkgInfo.(Package).toMapStr()) + } + + report.Event(mb.Event{ + MetricSetFields: common.MapStr{ + "packages": pkgInfos, + }, + }) + + if ms.cache != nil { + // This will initialize the cache with the current packages + ms.cache.DiffAndUpdateCache(packages) + } + } +} + +func (ms *MetricSet) getPackages() (packages []cache.Cacheable) { host, err := sysinfo.Host() if err != nil { ms.log.Errorw("Error getting the OS", "error", err) @@ -69,7 +169,6 @@ func (ms *MetricSet) Fetch(report mb.ReporterV2) { ms.log.Errorw("No OS info from sysinfo.Host", "error", err) } - var packages []Package switch hostInfo.OS.Family { case "redhat": packages, err = listRPMPackages() @@ -90,45 +189,13 @@ func (ms *MetricSet) Fetch(report mb.ReporterV2) { ms.log.Errorw("No logic for getting packages for OS family", "os", hostInfo.OS.Family) } - var pkgInfos []common.MapStr - - for _, pkg := range packages { - pkgInfos = append(pkgInfos, common.MapStr{ - "package.name": pkg.Name, - "package.version": pkg.Version, - "package.release": pkg.Release, - "package.arch": pkg.Arch, - "package.license": pkg.License, - "package.installtime": pkg.InstallTime, - "package.size": pkg.Size, - "package.summary": pkg.Summary, - "package.url": pkg.URL, - }) - } - - report.Event(mb.Event{ - MetricSetFields: common.MapStr{ - "packages": pkgInfos, - }, - }) -} - -type Package struct { - Name string - Version string - Release string - Arch string - License string - InstallTime time.Time - Size uint64 - Summary string - URL string + return } /* The following functions copied from https://github.com/tsg/listpackages/blob/master/main.go */ -func listRPMPackages() ([]Package, error) { +func listRPMPackages() ([]cache.Cacheable, error) { format := "%{NAME}|%{VERSION}|%{RELEASE}|%{ARCH}|%{LICENSE}|%{INSTALLTIME}|%{SIZE}|%{URL}|%{SUMMARY}\\n" out, err := exec.Command("/usr/bin/rpm", "--qf", format, "-qa").Output() if err != nil { @@ -136,7 +203,7 @@ func listRPMPackages() ([]Package, error) { } lines := strings.Split(string(out), "\n") - packages := []Package{} + packages := []cache.Cacheable{} for _, line := range lines { if len(strings.TrimSpace(line)) == 0 { continue @@ -174,7 +241,7 @@ func listRPMPackages() ([]Package, error) { return packages, nil } -func listDebPackages() ([]Package, error) { +func listDebPackages() ([]cache.Cacheable, error) { statusFile := "/var/lib/dpkg/status" file, err := os.Open(statusFile) if err != nil { @@ -182,7 +249,7 @@ func listDebPackages() ([]Package, error) { } defer file.Close() - packages := []Package{} + packages := []cache.Cacheable{} pkg := &Package{} scanner := bufio.NewScanner(file) for scanner.Scan() { @@ -226,7 +293,7 @@ func listDebPackages() ([]Package, error) { return packages, nil } -func listBrewPackages() ([]Package, error) { +func listBrewPackages() ([]cache.Cacheable, error) { cellarPath := "/usr/local/Cellar" cellarInfo, err := os.Stat(cellarPath) @@ -242,7 +309,7 @@ func listBrewPackages() ([]Package, error) { return nil, fmt.Errorf("Error reading directory %s: %v", cellarPath, err) } - packages := []Package{} + packages := []cache.Cacheable{} for _, packageDir := range packageDirs { if !packageDir.IsDir() { continue diff --git a/x-pack/auditbeat/module/system/processes/config.go b/x-pack/auditbeat/module/system/processes/config.go deleted file mode 100644 index 2bea612a44e8..000000000000 --- a/x-pack/auditbeat/module/system/processes/config.go +++ /dev/null @@ -1,17 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package processes - -// Config defines the host metricset's configuration options. -type Config struct { - // TODO: Add config options. -} - -// Validate validates the host metricset config. -func (c *Config) Validate() error { - return nil -} - -var defaultConfig = Config{} diff --git a/x-pack/auditbeat/module/system/processes/processes.go b/x-pack/auditbeat/module/system/processes/processes.go index 394b33319edb..e33d0d66bbc0 100644 --- a/x-pack/auditbeat/module/system/processes/processes.go +++ b/x-pack/auditbeat/module/system/processes/processes.go @@ -7,12 +7,13 @@ package processes import ( "strconv" - "github.com/pkg/errors" - "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/cfgwarn" "github.com/elastic/beats/metricbeat/mb" + "github.com/elastic/beats/x-pack/auditbeat/cache" + "github.com/elastic/beats/x-pack/auditbeat/module/system/config" "github.com/elastic/go-sysinfo/types" + "github.com/pkg/errors" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/metric/system/process" @@ -33,68 +34,23 @@ func init() { // MetricSet collects data about the host. type MetricSet struct { mb.BaseMetricSet - cache map[string](*types.ProcessInfo) - log *logp.Logger + config config.Config + cache *cache.Cache + log *logp.Logger } -// New constructs a new MetricSet. -func New(base mb.BaseMetricSet) (mb.MetricSet, error) { - cfgwarn.Experimental("The %v/%v dataset is experimental", moduleName, metricsetName) - - config := defaultConfig - if err := base.Module().UnpackConfig(&config); err != nil { - return nil, errors.Wrapf(err, "failed to unpack the %v/%v config", moduleName, metricsetName) - } - - return &MetricSet{ - BaseMetricSet: base, - log: logp.NewLogger(moduleName), - }, nil +// ProcessInfo wraps the process information and implements cache.Cacheable. +type ProcessInfo struct { + types.ProcessInfo } -// fastHash returns a hash calculated using FNV-1 of the PID and StartTime. -// Based on https://github.com/elastic/gosigar/blob/master/sys/linux/inetdiag.go#L362 -// TODO: Move to go-sysinfo -// Actually, might not need this after all. Delete? -/*func fastHashProcessInfo(pInfo *types.ProcessInfo) uint64 { - h := fnv.New64() - h.Write([]byte(strconv.Itoa(pInfo.PID))) - h.Write([]byte(pInfo.StartTime.String())) - return h.Sum64() -}*/ - -func processInfoNaiveHash(pInfo *types.ProcessInfo) string { +// Hash creates a hash for ProcessInfo. +func (pInfo ProcessInfo) Hash() string { // Could use real hash e.g. FNV if there is an advantage return strconv.Itoa(pInfo.PID) + pInfo.StartTime.String() } -func (ms *MetricSet) diffCache(current []*types.ProcessInfo) (new, missing []*types.ProcessInfo) { - // Check for new - what is in current but not in cache - for _, pInfo := range current { - if _, inCache := ms.cache[processInfoNaiveHash(pInfo)]; !inCache { - new = append(new, pInfo) - } - } - - // Check for missing - what is no longer in current that was in the cache - for cachedPInfoKey, cachedPInfo := range ms.cache { - found := false - for _, currentPInfo := range current { - if processInfoNaiveHash(currentPInfo) == cachedPInfoKey { - found = true - break - } - } - - if !found { - missing = append(missing, cachedPInfo) - } - } - - return -} - -func processInfoToMapStr(pInfo *types.ProcessInfo) common.MapStr { +func (pInfo ProcessInfo) toMapStr() common.MapStr { return common.MapStr{ // https://github.com/elastic/ecs#-process-fields "process.args": pInfo.Args, @@ -108,20 +64,41 @@ func processInfoToMapStr(pInfo *types.ProcessInfo) common.MapStr { } } -// Fetch collects data about the host. It is invoked periodically. +// New constructs a new MetricSet. +func New(base mb.BaseMetricSet) (mb.MetricSet, error) { + cfgwarn.Experimental("The %v/%v dataset is experimental", moduleName, metricsetName) + + config := config.NewDefaultConfig() + if err := base.Module().UnpackConfig(&config); err != nil { + return nil, errors.Wrapf(err, "failed to unpack the %v/%v config", moduleName, metricsetName) + } + + ms := &MetricSet{ + BaseMetricSet: base, + config: config, + log: logp.NewLogger(moduleName), + } + + if config.ReportChanges { + ms.cache = cache.New() + } + + return ms, nil +} + +// Fetch checks which processes are running on the host and reports them. +// It is invoked periodically. func (ms *MetricSet) Fetch(report mb.ReporterV2) { processInfos := ms.getProcessInfos() - diff := true - if ms.cache != nil && diff { - // find out which processes were stopped or started, if any - started, stopped := ms.diffCache(processInfos) + if ms.cache != nil && !ms.cache.IsEmpty() { + started, stopped := ms.cache.DiffAndUpdateCache(processInfos) for _, pInfo := range started { report.Event(mb.Event{ MetricSetFields: common.MapStr{ "status": "started", - "process": processInfoToMapStr(pInfo), + "process": pInfo.(ProcessInfo).toMapStr(), }, }) } @@ -130,15 +107,16 @@ func (ms *MetricSet) Fetch(report mb.ReporterV2) { report.Event(mb.Event{ MetricSetFields: common.MapStr{ "status": "stopped", - "process": processInfoToMapStr(pInfo), + "process": pInfo.(ProcessInfo).toMapStr(), }, }) } } else { + // Report all running processes var processEvents []common.MapStr for _, pInfo := range processInfos { - processEvents = append(processEvents, processInfoToMapStr(pInfo)) + processEvents = append(processEvents, pInfo.(ProcessInfo).toMapStr()) } report.Event(mb.Event{ @@ -146,18 +124,15 @@ func (ms *MetricSet) Fetch(report mb.ReporterV2) { "processes": processEvents, }, }) - } - if diff { - // Refill cache - ms.cache = make(map[string](*types.ProcessInfo)) - for _, pInfo := range processInfos { - ms.cache[processInfoNaiveHash(pInfo)] = pInfo + if ms.cache != nil { + // This will initialize the cache with the current processes + ms.cache.DiffAndUpdateCache(processInfos) } } } -func (ms *MetricSet) getProcessInfos() []*types.ProcessInfo { +func (ms *MetricSet) getProcessInfos() (processInfos []cache.Cacheable) { // TODO: Implement Processes() in go-sysinfo // e.g. https://github.com/elastic/go-sysinfo/blob/master/providers/darwin/process_darwin_amd64.go#L41 pids, err := process.Pids() @@ -165,12 +140,10 @@ func (ms *MetricSet) getProcessInfos() []*types.ProcessInfo { ms.log.Errorw("Failed to fetch the list of PIDs", "error", err) } - var processInfos []*types.ProcessInfo - for _, pid := range pids { if p, err := sysinfo.Process(pid); err == nil { if pInfo, err := p.Info(); err == nil { - processInfos = append(processInfos, &pInfo) + processInfos = append(processInfos, ProcessInfo{pInfo}) } else { ms.log.Errorw("Failed to load process information", "error", err) } @@ -179,5 +152,5 @@ func (ms *MetricSet) getProcessInfos() []*types.ProcessInfo { } } - return processInfos + return } From 43157968da1266daa0812910146b935f89741714 Mon Sep 17 00:00:00 2001 From: Christoph Wurm Date: Mon, 24 Sep 2018 10:52:35 +0100 Subject: [PATCH 04/21] Add fields.yml files and unit test. --- x-pack/auditbeat/cache/cache_test.go | 52 ++++++++++++++ .../module/system/_meta/config.yml.tmpl | 12 ++-- .../module/system/host/_meta/fields.yml | 63 ++++++++++++++++- x-pack/auditbeat/module/system/host/host.go | 67 +++++++++++-------- .../module/system/packages/_meta/fields.yml | 46 ++++++++++++- .../module/system/packages/packages.go | 36 +++++----- .../module/system/processes/_meta/fields.yml | 39 ++++++++++- .../module/system/processes/processes.go | 10 +-- 8 files changed, 263 insertions(+), 62 deletions(-) create mode 100644 x-pack/auditbeat/cache/cache_test.go diff --git a/x-pack/auditbeat/cache/cache_test.go b/x-pack/auditbeat/cache/cache_test.go new file mode 100644 index 000000000000..7dc12dd404b7 --- /dev/null +++ b/x-pack/auditbeat/cache/cache_test.go @@ -0,0 +1,52 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package cache + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +type CacheTestItem struct { + s string +} + +func (item CacheTestItem) Hash() string { + return item.s + item.s +} + +func TestCache(t *testing.T) { + c := New() + + assert.True(t, c.IsEmpty()) + + oldItems := []Cacheable{ + CacheTestItem{"item1"}, + CacheTestItem{"item2"}, + } + + newItems := []Cacheable{ + CacheTestItem{"item1"}, + CacheTestItem{"item3"}, + } + + new, missing := c.DiffAndUpdateCache(oldItems) + + assert.Equal(t, 2, len(new)) + assert.Equal(t, 0, len(missing)) + assert.False(t, c.IsEmpty()) + + new, missing = c.DiffAndUpdateCache(newItems) + + assert.Equal(t, 1, len(new)) + assert.Equal(t, 1, len(missing)) + + new, missing = c.DiffAndUpdateCache([]Cacheable{}) + + assert.Equal(t, 0, len(new)) + assert.Equal(t, 2, len(missing)) + assert.True(t, c.IsEmpty()) +} diff --git a/x-pack/auditbeat/module/system/_meta/config.yml.tmpl b/x-pack/auditbeat/module/system/_meta/config.yml.tmpl index 80e133ae740b..4e00bdeb6e99 100644 --- a/x-pack/auditbeat/module/system/_meta/config.yml.tmpl +++ b/x-pack/auditbeat/module/system/_meta/config.yml.tmpl @@ -1,15 +1,17 @@ {{ if .Reference -}} {{ end -}} - module: system - {{ if eq .GOOS "darwin" -}} + metricsets: - host + - packages + - processes + + report_changes: true + + {{ if eq .GOOS "darwin" -}} {{ else if eq .GOOS "windows" -}} - metricsets: - - host {{ else -}} - metricsets: - - host {{- end }} {{ if .Reference }} {{- end }} diff --git a/x-pack/auditbeat/module/system/host/_meta/fields.yml b/x-pack/auditbeat/module/system/host/_meta/fields.yml index 5cade6d671b6..99be5222c3d6 100644 --- a/x-pack/auditbeat/module/system/host/_meta/fields.yml +++ b/x-pack/auditbeat/module/system/host/_meta/fields.yml @@ -1,6 +1,67 @@ - name: host type: group description: > - `host` contains TODO. + `host` contains general host information. release: experimental fields: + - name: uptime + type: long + description: > + Uptime in nanoseconds. + - name: boottime + type: date + description: > + Boot time. + - name: containerized + type: boolean + description: > + Set if host is a container. + - name: timezone.name + type: keyword + description: > + Name of the timezone of the host, e.g. BST. + - name: timezone.offset.sec + type: long + description: > + Timezone offset in seconds. + - name: name + type: keyword + description: > + Hostname. + - name: id + type: keyword + description: > + Host ID. + - name: ip + type: ip + description: > + List of IP addresses on this host. + - name: mac + type: keyword + description: > + List of MAC addresses on this host. + - name: architecture + type: keyword + description: > + Host architecture (e.g. x86_64). + - name: os.platform + type: keyword + description: > + OS platform (e.g. centos, ubuntu, windows). + - name: os.name + type: keyword + description: > + OS name (e.g. Mac OS X). + - name: os.family + type: keyword + description: > + OS family (e.g. redhat, debian, freebsd, windows). + - name: os.version + type: keyword + description: > + OS version. + - name: os.kernel + type: keyword + description: > + The operating system's kernel version. + diff --git a/x-pack/auditbeat/module/system/host/host.go b/x-pack/auditbeat/module/system/host/host.go index 0b197d716222..47f869bc2fcf 100644 --- a/x-pack/auditbeat/module/system/host/host.go +++ b/x-pack/auditbeat/module/system/host/host.go @@ -42,41 +42,50 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { return nil, errors.Wrapf(err, "failed to unpack the %v/%v config", moduleName, metricsetName) } - return &MetricSet{ + ms := &MetricSet{ BaseMetricSet: base, log: logp.NewLogger(moduleName), - }, nil + } + + if config.ReportChanges { + // TODO: Implement reporting changes? + ms.log.Warnw("Metricset %v/%v does not support report_changes", moduleName, metricsetName) + } + + return ms, nil } // Fetch collects data about the host. It is invoked periodically. func (ms *MetricSet) Fetch(report mb.ReporterV2) { - if host, err := sysinfo.Host(); err == nil { - report.Event(mb.Event{ - MetricSetFields: common.MapStr{ - // https://github.com/elastic/ecs#-host-fields - "uptime": host.Info().Uptime(), - "boottime": host.Info().BootTime, - "containerized": host.Info().Containerized, - "timezone": host.Info().Timezone, - "timezone.offset.sec": host.Info().TimezoneOffsetSec, - "name": host.Info().Hostname, - "id": host.Info().UniqueID, - "ip": host.Info().IPs, - "mac": host.Info().MACs, - // TODO "host.type": ? - "architecture": host.Info().Architecture, + host, err := sysinfo.Host() + if err != nil { + report.Error(errors.Wrap(err, "Failed to load host information")) + return + } + + report.Event(mb.Event{ + MetricSetFields: common.MapStr{ + // https://github.com/elastic/ecs#-host-fields + "uptime": host.Info().Uptime(), + "boottime": host.Info().BootTime, + "containerized": host.Info().Containerized, + "timezone.name": host.Info().Timezone, + "timezone.offset.sec": host.Info().TimezoneOffsetSec, + "name": host.Info().Hostname, + "id": host.Info().UniqueID, + "ip": host.Info().IPs, + "mac": host.Info().MACs, + // TODO "host.type": ? + "architecture": host.Info().Architecture, - // https://github.com/elastic/ecs#-operating-system-fields - "os": common.MapStr{ - "platform": host.Info().OS.Platform, - "name": host.Info().OS.Name, - "family": host.Info().OS.Family, - "version": host.Info().OS.Version, - "kernel": host.Info().KernelVersion, - }, + // https://github.com/elastic/ecs#-operating-system-fields + "os": common.MapStr{ + "platform": host.Info().OS.Platform, + "name": host.Info().OS.Name, + "family": host.Info().OS.Family, + "version": host.Info().OS.Version, + "kernel": host.Info().KernelVersion, }, - }) - } else { - ms.log.Errorw("Failed to load host information", "error", err) - } + }, + }) } diff --git a/x-pack/auditbeat/module/system/packages/_meta/fields.yml b/x-pack/auditbeat/module/system/packages/_meta/fields.yml index baaef240d12c..79cbedcbc67c 100644 --- a/x-pack/auditbeat/module/system/packages/_meta/fields.yml +++ b/x-pack/auditbeat/module/system/packages/_meta/fields.yml @@ -1,6 +1,50 @@ - name: packages type: group description: > - `packages` contains TODO. + `packages` contains information about installed packages. release: experimental fields: + - name: status + type: keyword + description: > + Package change - `installed` or `removed`. + - name: packages + type: group + description: > + List of packages. + fields: + - name: package.name + type: keyword + description: > + Package name. + - name: package.version + type: keyword + description: > + Package version. + - name: package.release + type: keyword + description: > + Package release. + - name: package.arch + type: keyword + description: > + Package architecture. + - name: package.license + type: keyword + description: > + Package license. + - name: package.installtime + type: date + description: > + Package install time. + - name: package.size + type: long + description: > + Package size. + - name: package.summary + description: > + Package summary. + - name: package.url + type: keyword + description: > + Package URL. diff --git a/x-pack/auditbeat/module/system/packages/packages.go b/x-pack/auditbeat/module/system/packages/packages.go index b2c451bcab12..4c38f68c2b30 100644 --- a/x-pack/auditbeat/module/system/packages/packages.go +++ b/x-pack/auditbeat/module/system/packages/packages.go @@ -103,19 +103,13 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { // Fetch collects data about the host. It is invoked periodically. func (ms *MetricSet) Fetch(report mb.ReporterV2) { - packages := ms.getPackages() - - /*var pkgInfos []common.MapStr - - for _, pkg := range packages { - pkgInfos = append(pkgInfos, pkg.toMapStr()) + packages, err := getPackages() + if err != nil { + report.Error(err) + } + if packages == nil { + return } - - report.Event(mb.Event{ - MetricSetFields: common.MapStr{ - "packages": pkgInfos, - }, - })*/ if ms.cache != nil && !ms.cache.IsEmpty() { installed, removed := ms.cache.DiffAndUpdateCache(packages) @@ -158,38 +152,40 @@ func (ms *MetricSet) Fetch(report mb.ReporterV2) { } } -func (ms *MetricSet) getPackages() (packages []cache.Cacheable) { +func getPackages() ([]cache.Cacheable, error) { host, err := sysinfo.Host() if err != nil { - ms.log.Errorw("Error getting the OS", "error", err) + return nil, errors.Wrap(err, "Error getting the OS") } hostInfo := host.Info() if hostInfo.OS == nil { - ms.log.Errorw("No OS info from sysinfo.Host", "error", err) + return nil, errors.New("No host info") } + var packages []cache.Cacheable + switch hostInfo.OS.Family { case "redhat": packages, err = listRPMPackages() if err != nil { - ms.log.Errorw("Error getting RPM packages", "error", err) + err = errors.Wrap(err, "Error getting RPM packages") } case "debian": packages, err = listDebPackages() if err != nil { - ms.log.Errorw("Error getting DEB packages", "error", err) + err = errors.Wrap(err, "Error getting DEB packages") } case "darwin": packages, err = listBrewPackages() if err != nil { - ms.log.Errorw("Error getting Homebrew packages", "error", err) + err = errors.Wrap(err, "Error getting Homebrew packages") } default: - ms.log.Errorw("No logic for getting packages for OS family", "os", hostInfo.OS.Family) + return nil, fmt.Errorf("No logic for getting packages for OS family %v", hostInfo.OS.Family) } - return + return packages, err } /* diff --git a/x-pack/auditbeat/module/system/processes/_meta/fields.yml b/x-pack/auditbeat/module/system/processes/_meta/fields.yml index d55488828d71..83c9b4b1c192 100644 --- a/x-pack/auditbeat/module/system/processes/_meta/fields.yml +++ b/x-pack/auditbeat/module/system/processes/_meta/fields.yml @@ -1,6 +1,43 @@ - name: processes type: group description: > - `processes` contains TODO. + `processes` contains process information. release: experimental fields: + - name: status + type: keyword + description: > + Process change - `started` or `stopped`. + - name: processes + type: group + description: > + List of processes. + fields: + - name: process.name + type: keyword + description: > + Process name. + - name: process.args + type: text + description: > + List of process arguments. + - name: process.pid + type: keyword + description: > + Process PID. + - name: process.ppid + type: keyword + description: > + Process Parent PID. + - name: process.cwd + type: text + description: > + Directory from which the process was started. + - name: process.exe + type: text + description: > + Full path of the process executable. + - name: process.starttime + type: date + description: > + Start time of the process. \ No newline at end of file diff --git a/x-pack/auditbeat/module/system/processes/processes.go b/x-pack/auditbeat/module/system/processes/processes.go index e33d0d66bbc0..f4c7d3d74813 100644 --- a/x-pack/auditbeat/module/system/processes/processes.go +++ b/x-pack/auditbeat/module/system/processes/processes.go @@ -53,8 +53,8 @@ func (pInfo ProcessInfo) Hash() string { func (pInfo ProcessInfo) toMapStr() common.MapStr { return common.MapStr{ // https://github.com/elastic/ecs#-process-fields - "process.args": pInfo.Args, "process.name": pInfo.Name, + "process.args": pInfo.Args, "process.pid": pInfo.PID, "process.ppid": pInfo.PPID, @@ -97,8 +97,8 @@ func (ms *MetricSet) Fetch(report mb.ReporterV2) { for _, pInfo := range started { report.Event(mb.Event{ MetricSetFields: common.MapStr{ - "status": "started", - "process": pInfo.(ProcessInfo).toMapStr(), + "status": "started", + "processes": pInfo.(ProcessInfo).toMapStr(), }, }) } @@ -106,8 +106,8 @@ func (ms *MetricSet) Fetch(report mb.ReporterV2) { for _, pInfo := range stopped { report.Event(mb.Event{ MetricSetFields: common.MapStr{ - "status": "stopped", - "process": pInfo.(ProcessInfo).toMapStr(), + "status": "stopped", + "processes": pInfo.(ProcessInfo).toMapStr(), }, }) } From 621bc34b6ecf69f6b4a056e66f54dd254a8bdd18 Mon Sep 17 00:00:00 2001 From: Christoph Wurm Date: Tue, 25 Sep 2018 17:39:57 +0100 Subject: [PATCH 05/21] Add system tests. --- auditbeat/tests/system/auditbeat.py | 7 +- libbeat/generator/fields/fields.go | 34 ++- .../fields/module_fields_collector.go | 26 +- libbeat/scripts/Makefile | 2 +- libbeat/scripts/cmd/global_fields/main.go | 29 +- metricbeat/tests/system/metricbeat.py | 8 +- x-pack/auditbeat/Makefile | 7 + x-pack/auditbeat/magefile.go | 258 ++++++++++++++++++ x-pack/auditbeat/main_test.go | 30 ++ .../auditbeat/module/system/_meta/fields.yml | 10 +- .../module/system/packages/_meta/fields.yml | 2 +- .../module/system/processes/_meta/fields.yml | 2 +- .../auditbeat/tests/system/auditbeat_xpack.py | 52 ++++ .../auditbeat/tests/system/test_metricsets.py | 42 +++ 14 files changed, 471 insertions(+), 38 deletions(-) create mode 100644 x-pack/auditbeat/Makefile create mode 100644 x-pack/auditbeat/magefile.go create mode 100644 x-pack/auditbeat/main_test.go create mode 100644 x-pack/auditbeat/tests/system/auditbeat_xpack.py create mode 100644 x-pack/auditbeat/tests/system/test_metricsets.py diff --git a/auditbeat/tests/system/auditbeat.py b/auditbeat/tests/system/auditbeat.py index 1c0f4e816b89..3b3dfd30354d 100644 --- a/auditbeat/tests/system/auditbeat.py +++ b/auditbeat/tests/system/auditbeat.py @@ -15,8 +15,11 @@ class BaseTest(MetricbeatTest): @classmethod def setUpClass(self): self.beat_name = "auditbeat" - self.beat_path = os.path.abspath( - os.path.join(os.path.dirname(__file__), "../../")) + + if not hasattr(self, 'beat_path'): + self.beat_path = os.path.abspath( + os.path.join(os.path.dirname(__file__), "../../")) + super(MetricbeatTest, self).setUpClass() def create_file(self, path, contents): diff --git a/libbeat/generator/fields/fields.go b/libbeat/generator/fields/fields.go index 9449f55de907..87c2cd46c818 100644 --- a/libbeat/generator/fields/fields.go +++ b/libbeat/generator/fields/fields.go @@ -31,6 +31,27 @@ type YmlFile struct { Indent int } +// NewYmlFile performs some checks and then creates and returns a YmlFile struct +func NewYmlFile(path string, indent int) (*YmlFile, error) { + _, err := os.Stat(path) + + if os.IsNotExist(err) { + // skip + return nil, nil + } + + if err != nil { + // return error + return nil, err + } + + // All good, return file + return &YmlFile{ + Path: path, + Indent: indent, + }, nil +} + func collectCommonFiles(esBeatsPath, beatPath string, fieldFiles []*YmlFile) ([]*YmlFile, error) { commonFields := []string{ filepath.Join(beatPath, "_meta/fields.common.yml"), @@ -52,16 +73,13 @@ func collectCommonFiles(esBeatsPath, beatPath string, fieldFiles []*YmlFile) ([] var files []*YmlFile for _, cf := range commonFields { - _, err := os.Stat(cf) - if os.IsNotExist(err) { - continue - } else if err != nil { + ymlFile, err := NewYmlFile(cf, 0) + + if err != nil { return nil, err + } else if ymlFile != nil { + files = append(files, ymlFile) } - files = append(files, &YmlFile{ - Path: cf, - Indent: 0, - }) } files = append(files, libbeatFieldFiles...) diff --git a/libbeat/generator/fields/module_fields_collector.go b/libbeat/generator/fields/module_fields_collector.go index f9feeefc4b6e..a095b16aae19 100644 --- a/libbeat/generator/fields/module_fields_collector.go +++ b/libbeat/generator/fields/module_fields_collector.go @@ -19,7 +19,6 @@ package fields import ( "io/ioutil" - "os" "path/filepath" ) @@ -69,16 +68,15 @@ func CollectModuleFiles(modulesDir string) ([]*YmlFile, error) { // CollectFiles collects all files for the given module including filesets func CollectFiles(module string, modulesPath string) ([]*YmlFile, error) { - var files []*YmlFile + fieldsYmlPath := filepath.Join(modulesPath, module, "_meta/fields.yml") - if _, err := os.Stat(fieldsYmlPath); !os.IsNotExist(err) { - files = append(files, &YmlFile{ - Path: fieldsYmlPath, - Indent: 0, - }) - } else if !os.IsNotExist(err) && err != nil { + ymlFile, err := NewYmlFile(fieldsYmlPath, 0) + + if err != nil { return nil, err + } else if ymlFile != nil { + files = append(files, ymlFile) } modulesRoot := filepath.Base(modulesPath) @@ -91,14 +89,14 @@ func CollectFiles(module string, modulesPath string) ([]*YmlFile, error) { if !s.IsDir() { continue } + fieldsYmlPath = filepath.Join(modulesPath, module, s.Name(), "_meta/fields.yml") - if _, err = os.Stat(fieldsYmlPath); !os.IsNotExist(err) { - files = append(files, &YmlFile{ - Path: fieldsYmlPath, - Indent: indentByModule[modulesRoot], - }) - } else if !os.IsNotExist(err) && err != nil { + ymlFile, err := NewYmlFile(fieldsYmlPath, indentByModule[modulesRoot]) + + if err != nil { return nil, err + } else if ymlFile != nil { + files = append(files, ymlFile) } } return files, nil diff --git a/libbeat/scripts/Makefile b/libbeat/scripts/Makefile index 916cd6e4ee02..ae7feff8c8ba 100755 --- a/libbeat/scripts/Makefile +++ b/libbeat/scripts/Makefile @@ -201,7 +201,7 @@ integration-tests-environment: prepare-tests build-image system-tests: ## @testing Runs the system tests system-tests: prepare-tests ${BEAT_NAME}.test python-env . ${PYTHON_ENV}/bin/activate; INTEGRATION_TESTS=${INTEGRATION_TESTS} TESTING_ENVIRONMENT=${TESTING_ENVIRONMENT} DOCKER_COMPOSE_PROJECT_NAME=${DOCKER_COMPOSE_PROJECT_NAME} nosetests ${PYTHON_TEST_FILES} ${NOSETESTS_OPTIONS} - python ${ES_BEATS}/dev-tools/aggregate_coverage.py -o ${COVERAGE_DIR}/system.cov ./build/system-tests/run + python ${ES_BEATS}/dev-tools/aggregate_coverage.py -o ${COVERAGE_DIR}/system.cov ${BUILD_DIR}/system-tests/run # Runs the system tests .PHONY: system-tests-environment diff --git a/libbeat/scripts/cmd/global_fields/main.go b/libbeat/scripts/cmd/global_fields/main.go index 238ce040b8b8..2b77b6a9e85a 100644 --- a/libbeat/scripts/cmd/global_fields/main.go +++ b/libbeat/scripts/cmd/global_fields/main.go @@ -22,6 +22,7 @@ import ( "fmt" "os" "path/filepath" + "strings" "github.com/elastic/beats/libbeat/generator/fields" ) @@ -78,15 +79,29 @@ func main() { var fieldsFiles []*fields.YmlFile for _, fieldsFilePath := range beatFieldsPaths { - pathToModules := filepath.Join(beatPath, fieldsFilePath) + fullPath := filepath.Join(beatPath, fieldsFilePath) - fieldsFile, err := fields.CollectModuleFiles(pathToModules) - if err != nil { - fmt.Fprintf(os.Stderr, "Cannot collect fields.yml files: %+v\n", err) - os.Exit(2) - } + isFile := strings.HasSuffix(fullPath, ".yml") + if isFile { + file, err := fields.NewYmlFile(fullPath, 0) + + if err != nil || file == nil { + fmt.Fprintf(os.Stderr, "Cannot collect fields.yml file: %+v\n", err) + os.Exit(2) + } + + fieldsFiles = append(fieldsFiles, file) + } else { + // fullPath is a directory + files, err := fields.CollectModuleFiles(fullPath) - fieldsFiles = append(fieldsFiles, fieldsFile...) + if err != nil { + fmt.Fprintf(os.Stderr, "Cannot collect fields.yml files: %+v\n", err) + os.Exit(2) + } + + fieldsFiles = append(fieldsFiles, files...) + } } err = fields.Generate(esBeatsPath, beatPath, fieldsFiles, output) diff --git a/metricbeat/tests/system/metricbeat.py b/metricbeat/tests/system/metricbeat.py index 8308a9c0e765..ae53300c6534 100644 --- a/metricbeat/tests/system/metricbeat.py +++ b/metricbeat/tests/system/metricbeat.py @@ -19,8 +19,12 @@ class BaseTest(TestCase): @classmethod def setUpClass(self): - self.beat_name = "metricbeat" - self.beat_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../")) + if not hasattr(self, 'beat_name'): + self.beat_name = "metricbeat" + + if not hasattr(self, 'beat_path'): + self.beat_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../")) + super(BaseTest, self).setUpClass() def de_dot(self, existing_fields): diff --git a/x-pack/auditbeat/Makefile b/x-pack/auditbeat/Makefile new file mode 100644 index 000000000000..5ab94b36bffb --- /dev/null +++ b/x-pack/auditbeat/Makefile @@ -0,0 +1,7 @@ +BEAT_NAME=auditbeat +ES_BEATS?=../.. +XPACK_BEAT_PATH?=github.com/elastic/beats/x-pack/${BEAT_NAME} +GOPACKAGES?=$(shell go list ${BEAT_PATH}/... ${XPACK_BEAT_PATH}/... | grep -v /vendor/ | grep -v /scripts/cmd/ ) + +# Include main auditbeat Makefile +include ${ES_BEATS}/${BEAT_NAME}/Makefile diff --git a/x-pack/auditbeat/magefile.go b/x-pack/auditbeat/magefile.go new file mode 100644 index 000000000000..12b8c957711c --- /dev/null +++ b/x-pack/auditbeat/magefile.go @@ -0,0 +1,258 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +// +build mage + +package main + +import ( + "context" + "fmt" + "regexp" + "time" + + "github.com/magefile/mage/mg" + "github.com/magefile/mage/sh" + "github.com/pkg/errors" + + "github.com/elastic/beats/dev-tools/mage" +) + +func init() { + mage.BeatDescription = "Audit the activities of users and processes on your system." +} + +// Build builds the Beat binary. +func Build() error { + return mage.Build(mage.DefaultBuildArgs()) +} + +// GolangCrossBuild build the Beat binary inside of the golang-builder. +// Do not use directly, use crossBuild instead. +func GolangCrossBuild() error { + return mage.GolangCrossBuild(mage.DefaultGolangCrossBuildArgs()) +} + +// BuildGoDaemon builds the go-daemon binary (use crossBuildGoDaemon). +func BuildGoDaemon() error { + return mage.BuildGoDaemon() +} + +// CrossBuild cross-builds the beat for all target platforms. +func CrossBuild() error { + return mage.CrossBuild() +} + +// CrossBuildXPack cross-builds the beat with XPack for all target platforms. +func CrossBuildXPack() error { + return mage.CrossBuildXPack() +} + +// CrossBuildGoDaemon cross-builds the go-daemon binary using Docker. +func CrossBuildGoDaemon() error { + return mage.CrossBuildGoDaemon() +} + +// Clean cleans all generated files and build artifacts. +func Clean() error { + return mage.Clean() +} + +// Package packages the Beat for distribution. +// Use SNAPSHOT=true to build snapshots. +// Use PLATFORMS to control the target platforms. +func Package() { + start := time.Now() + defer func() { fmt.Println("package ran for", time.Since(start)) }() + + mage.UseElasticBeatPackaging() + customizePackaging() + + mg.Deps(Update) + mg.Deps(makeConfigTemplates, CrossBuild, CrossBuildXPack, CrossBuildGoDaemon) + mg.SerialDeps(mage.Package, TestPackages) +} + +// TestPackages tests the generated packages (i.e. file modes, owners, groups). +func TestPackages() error { + return mage.TestPackages() +} + +// Update updates the generated files (aka make update). +func Update() error { + return sh.Run("make", "update") +} + +// Fields generates a fields.yml for the Beat. +func Fields() error { + return mage.GenerateFieldsYAML("module", "../../auditbeat/module", "../../auditbeat/_meta/fields.common.yml") +} + +// GoTestUnit executes the Go unit tests. +// Use TEST_COVERAGE=true to enable code coverage profiling. +// Use RACE_DETECTOR=true to enable the race detector. +func GoTestUnit(ctx context.Context) error { + return mage.GoTest(ctx, mage.DefaultGoTestUnitArgs()) +} + +// GoTestIntegration executes the Go integration tests. +// Use TEST_COVERAGE=true to enable code coverage profiling. +// Use RACE_DETECTOR=true to enable the race detector. +func GoTestIntegration(ctx context.Context) error { + return mage.GoTest(ctx, mage.DefaultGoTestIntegrationArgs()) +} + +// ----------------------------------------------------------------------------- +// Customizations specific to Auditbeat. +// - Config files are Go templates. + +const ( + configTemplateGlob = "module/*/_meta/config*.yml.tmpl" + shortConfigTemplate = "build/auditbeat.yml.tmpl" + referenceConfigTemplate = "build/auditbeat.reference.yml.tmpl" +) + +func makeConfigTemplates() error { + configFiles, err := mage.FindFiles(configTemplateGlob) + if err != nil { + return errors.Wrap(err, "failed to find config templates") + } + + var shortIn []string + shortIn = append(shortIn, "_meta/common.p1.yml") + shortIn = append(shortIn, configFiles...) + shortIn = append(shortIn, "_meta/common.p2.yml") + shortIn = append(shortIn, "../libbeat/_meta/config.yml") + if !mage.IsUpToDate(shortConfigTemplate, shortIn...) { + fmt.Println(">> Building", shortConfigTemplate) + mage.MustFileConcat(shortConfigTemplate, 0600, shortIn...) + mage.MustFindReplace(shortConfigTemplate, regexp.MustCompile("beatname"), "{{.BeatName}}") + mage.MustFindReplace(shortConfigTemplate, regexp.MustCompile("beat-index-prefix"), "{{.BeatIndexPrefix}}") + } + + var referenceIn []string + referenceIn = append(referenceIn, "_meta/common.reference.yml") + referenceIn = append(referenceIn, configFiles...) + referenceIn = append(referenceIn, "../libbeat/_meta/config.reference.yml") + if !mage.IsUpToDate(referenceConfigTemplate, referenceIn...) { + fmt.Println(">> Building", referenceConfigTemplate) + mage.MustFileConcat(referenceConfigTemplate, 0644, referenceIn...) + mage.MustFindReplace(referenceConfigTemplate, regexp.MustCompile("beatname"), "{{.BeatName}}") + mage.MustFindReplace(referenceConfigTemplate, regexp.MustCompile("beat-index-prefix"), "{{.BeatIndexPrefix}}") + } + + return nil +} + +// customizePackaging modifies the package specs to use templated config files +// instead of the defaults. +// +// Customizations specific to Auditbeat: +// - Include audit.rules.d directory in packages. +func customizePackaging() { + var ( + shortConfig = mage.PackageFile{ + Mode: 0600, + Source: "{{.PackageDir}}/auditbeat.yml", + Dep: generateShortConfig, + Config: true, + } + referenceConfig = mage.PackageFile{ + Mode: 0644, + Source: "{{.PackageDir}}/auditbeat.reference.yml", + Dep: generateReferenceConfig, + } + ) + + archiveRulesDir := "audit.rules.d" + linuxPkgRulesDir := "/etc/{{.BeatName}}/audit.rules.d" + rulesSrcDir := "module/auditd/_meta/audit.rules.d" + sampleRules := mage.PackageFile{ + Mode: 0644, + Source: rulesSrcDir, + Dep: func(spec mage.PackageSpec) error { + if spec.OS == "linux" { + params := map[string]interface{}{ + "ArchBits": archBits, + } + rulesFile := spec.MustExpand(rulesSrcDir+"/sample-rules-linux-{{call .ArchBits .GOARCH}}bit.conf", params) + if err := mage.Copy(rulesFile, spec.MustExpand("{{.PackageDir}}/audit.rules.d/sample-rules.conf.disabled")); err != nil { + return errors.Wrap(err, "failed to copy sample rules") + } + } + return nil + }, + } + + for _, args := range mage.Packages { + pkgType := args.Types[0] + switch pkgType { + case mage.TarGz, mage.Zip: + args.Spec.ReplaceFile("{{.BeatName}}.yml", shortConfig) + args.Spec.ReplaceFile("{{.BeatName}}.reference.yml", referenceConfig) + case mage.Deb, mage.RPM, mage.DMG: + args.Spec.ReplaceFile("/etc/{{.BeatName}}/{{.BeatName}}.yml", shortConfig) + args.Spec.ReplaceFile("/etc/{{.BeatName}}/{{.BeatName}}.reference.yml", referenceConfig) + default: + panic(errors.Errorf("unhandled package type: %v", pkgType)) + } + if args.OS == "linux" { + rulesDest := archiveRulesDir + if pkgType != mage.TarGz { + rulesDest = linuxPkgRulesDir + } + args.Spec.Files[rulesDest] = sampleRules + } + } +} + +func generateReferenceConfig(spec mage.PackageSpec) error { + params := map[string]interface{}{ + "Reference": true, + "ArchBits": archBits, + } + return spec.ExpandFile(referenceConfigTemplate, + "{{.PackageDir}}/auditbeat.reference.yml", params) +} + +func generateShortConfig(spec mage.PackageSpec) error { + params := map[string]interface{}{ + "Reference": false, + "ArchBits": archBits, + } + return spec.ExpandFile(shortConfigTemplate, + "{{.PackageDir}}/auditbeat.yml", params) +} + +// archBits returns the number of bit width of the GOARCH architecture value. +// This function is used by the auditd module configuration templates to +// generate architecture specific audit rules. +func archBits(goarch string) int { + switch goarch { + case "386", "arm": + return 32 + default: + return 64 + } +} + +// Configs generates the auditbeat.yml and auditbeat.reference.yml config files. +// Set DEV_OS and DEV_ARCH to change the target host for the generated configs. +// Defaults to linux/amd64. +func Configs() { + mg.Deps(makeConfigTemplates) + + params := map[string]interface{}{ + "GOOS": mage.EnvOr("DEV_OS", "linux"), + "GOARCH": mage.EnvOr("DEV_ARCH", "amd64"), + "ArchBits": archBits, + "Reference": false, + } + fmt.Printf(">> Building auditbeat.yml for %v/%v\n", params["GOOS"], params["GOARCH"]) + mage.MustExpandFile(shortConfigTemplate, "auditbeat.yml", params) + + params["Reference"] = true + fmt.Printf(">> Building auditbeat.reference.yml for %v/%v\n", params["GOOS"], params["GOARCH"]) + mage.MustExpandFile(referenceConfigTemplate, "auditbeat.reference.yml", params) +} diff --git a/x-pack/auditbeat/main_test.go b/x-pack/auditbeat/main_test.go new file mode 100644 index 000000000000..1eca7b4f54df --- /dev/null +++ b/x-pack/auditbeat/main_test.go @@ -0,0 +1,30 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package main + +// This file is mandatory as otherwise the auditbeat.test binary is not generated correctly. + +import ( + "flag" + "testing" + + "github.com/elastic/beats/auditbeat/cmd" +) + +var systemTest *bool + +func init() { + systemTest = flag.Bool("systemTest", false, "Set to true when running system tests") + + cmd.RootCmd.PersistentFlags().AddGoFlag(flag.CommandLine.Lookup("systemTest")) + cmd.RootCmd.PersistentFlags().AddGoFlag(flag.CommandLine.Lookup("test.coverprofile")) +} + +// Test started when the test binary is started. Only calls main. +func TestSystem(t *testing.T) { + if *systemTest { + main() + } +} diff --git a/x-pack/auditbeat/module/system/_meta/fields.yml b/x-pack/auditbeat/module/system/_meta/fields.yml index 36667c7b0025..d2ad0830d35f 100644 --- a/x-pack/auditbeat/module/system/_meta/fields.yml +++ b/x-pack/auditbeat/module/system/_meta/fields.yml @@ -1,4 +1,10 @@ - key: system - title: System - description: These are the fields generated by the system module. + title: "System" + description: > + These are the fields generated by the system module. + release: experimental fields: + - name: system + type: group + description: > + fields: \ No newline at end of file diff --git a/x-pack/auditbeat/module/system/packages/_meta/fields.yml b/x-pack/auditbeat/module/system/packages/_meta/fields.yml index 79cbedcbc67c..383dd84becf6 100644 --- a/x-pack/auditbeat/module/system/packages/_meta/fields.yml +++ b/x-pack/auditbeat/module/system/packages/_meta/fields.yml @@ -9,7 +9,7 @@ description: > Package change - `installed` or `removed`. - name: packages - type: group + type: array description: > List of packages. fields: diff --git a/x-pack/auditbeat/module/system/processes/_meta/fields.yml b/x-pack/auditbeat/module/system/processes/_meta/fields.yml index 83c9b4b1c192..c65f644ba97e 100644 --- a/x-pack/auditbeat/module/system/processes/_meta/fields.yml +++ b/x-pack/auditbeat/module/system/processes/_meta/fields.yml @@ -9,7 +9,7 @@ description: > Process change - `started` or `stopped`. - name: processes - type: group + type: array description: > List of processes. fields: diff --git a/x-pack/auditbeat/tests/system/auditbeat_xpack.py b/x-pack/auditbeat/tests/system/auditbeat_xpack.py new file mode 100644 index 000000000000..1941e12d97d5 --- /dev/null +++ b/x-pack/auditbeat/tests/system/auditbeat_xpack.py @@ -0,0 +1,52 @@ +import jinja2 +import os + +from auditbeat import BaseTest as AuditbeatTest + +class AuditbeatXPackTest(AuditbeatTest): + + @classmethod + def setUpClass(self): + self.beat_name = "auditbeat" + self.beat_path = os.path.abspath( + os.path.join(os.path.dirname(__file__), "../../")) + + super(AuditbeatTest, self).setUpClass() + + + def setUp(self): + super(AuditbeatTest, self).setUp() + + # Hack to make jinja2 have the right paths + self.template_env = jinja2.Environment( + loader=jinja2.FileSystemLoader([ + os.path.abspath(os.path.join(self.beat_path, "../../auditbeat")), + os.path.abspath(os.path.join(self.beat_path, "../../libbeat")) + ]) + ) + + # Adapted from metricbeat.py + def check_metricset(self, module, metricset, fields=[], warnings_allowed=False): + """ + Method to test a metricset for its fields + """ + self.render_config_template(modules=[{ + "name": module, + "metricsets": [metricset], + "period": "10s", + }]) + proc = self.start_beat() + self.wait_until(lambda: self.output_lines() > 0) + proc.check_kill_and_wait() + + if not warnings_allowed: + self.assert_no_logged_warnings() + + output = self.read_output_json() + self.assertTrue(len(output) >= 1) + evt = output[0] + print(evt) + + self.assertItemsEqual(self.de_dot(fields), evt.keys()) + + self.assert_fields_are_documented(evt) \ No newline at end of file diff --git a/x-pack/auditbeat/tests/system/test_metricsets.py b/x-pack/auditbeat/tests/system/test_metricsets.py new file mode 100644 index 000000000000..d1cd0f632577 --- /dev/null +++ b/x-pack/auditbeat/tests/system/test_metricsets.py @@ -0,0 +1,42 @@ +import jinja2 +import os +import sys +import time + +sys.path.append(os.path.join(os.path.dirname(__file__), '../../../../auditbeat/tests/system')) + +from auditbeat_xpack import * + +COMMON_FIELDS = ["@timestamp", "beat.version", "host.name", "event.module", "event.dataset"] + +class Test(AuditbeatXPackTest): + + def test_metricset_host(self): + """ + host metricset collects general information about a server. + """ + + fields = ["system.host.uptime", "system.host.ip", "system.host.os.name"] + + # Metricset is experimental and that generates a warning, TODO: remove later + self.check_metricset("system", "host", COMMON_FIELDS + fields, warnings_allowed=True) + + def test_metricset_packages(self): + """ + packages metricset collects information about installed packages on a system. + """ + + fields = ["system.packages.packages.package.name"] + + # Metricset is experimental and that generates a warning, TODO: remove later + self.check_metricset("system", "packages", COMMON_FIELDS + fields, warnings_allowed=True) + + def test_metricset_processes(self): + """ + processes metricset collects information about processes running on a system. + """ + + fields = ["system.processes.processes.process.name"] + + # Metricset is experimental and that generates a warning, TODO: remove later + self.check_metricset("system", "processes", COMMON_FIELDS + fields, warnings_allowed=True) From ee76d3b10e624c828c90ef100a39b1025a2d7a2b Mon Sep 17 00:00:00 2001 From: Christoph Wurm Date: Tue, 25 Sep 2018 17:58:02 +0100 Subject: [PATCH 06/21] Minor fixes. --- x-pack/auditbeat/module/system/_meta/fields.yml | 3 ++- x-pack/auditbeat/module/system/packages/packages.go | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/x-pack/auditbeat/module/system/_meta/fields.yml b/x-pack/auditbeat/module/system/_meta/fields.yml index d2ad0830d35f..b5671726ab26 100644 --- a/x-pack/auditbeat/module/system/_meta/fields.yml +++ b/x-pack/auditbeat/module/system/_meta/fields.yml @@ -7,4 +7,5 @@ - name: system type: group description: > - fields: \ No newline at end of file + fields: + diff --git a/x-pack/auditbeat/module/system/packages/packages.go b/x-pack/auditbeat/module/system/packages/packages.go index 4c38f68c2b30..153a34444be6 100644 --- a/x-pack/auditbeat/module/system/packages/packages.go +++ b/x-pack/auditbeat/module/system/packages/packages.go @@ -336,7 +336,7 @@ func listBrewPackages() ([]cache.Cacheable, error) { scanner := bufio.NewScanner(file) count := 15 // only look into the first few lines of the formula for scanner.Scan() { - count -= 1 + count-- if count == 0 { break } From 066a9b0fb6abff3a832b839639109a69e7edc14d Mon Sep 17 00:00:00 2001 From: Christoph Wurm Date: Tue, 25 Sep 2018 20:37:03 +0100 Subject: [PATCH 07/21] Elastic license check and some formatting. --- x-pack/auditbeat/Makefile | 22 ++++++++++++++++++- .../module/system/processes/processes.go | 3 ++- .../auditbeat/tests/system/auditbeat_xpack.py | 4 ++-- .../auditbeat/tests/system/test_metricsets.py | 7 +++--- 4 files changed, 29 insertions(+), 7 deletions(-) diff --git a/x-pack/auditbeat/Makefile b/x-pack/auditbeat/Makefile index 5ab94b36bffb..1776815a0c51 100644 --- a/x-pack/auditbeat/Makefile +++ b/x-pack/auditbeat/Makefile @@ -4,4 +4,24 @@ XPACK_BEAT_PATH?=github.com/elastic/beats/x-pack/${BEAT_NAME} GOPACKAGES?=$(shell go list ${BEAT_PATH}/... ${XPACK_BEAT_PATH}/... | grep -v /vendor/ | grep -v /scripts/cmd/ ) # Include main auditbeat Makefile -include ${ES_BEATS}/${BEAT_NAME}/Makefile +include ${ES_BEATS}/${BEAT_NAME}/Makefile + +# Overwrite check-headers - check for Apache license in +# auditbeat/ and Elastic license in xpack/auditbeat/ +.PHONY: check-headers +check-headers: +ifndef CHECK_HEADERS_DISABLED + @go get -u github.com/elastic/go-licenser + @go-licenser -d -license ${LICENSE} ${ES_BEATS}/${BEAT_NAME} + @go-licenser -d -license Elastic . +endif + +# Overwrite check-headers - insert Apache license in +# auditbeat/ and Elastic license in xpack/auditbeat/ +.PHONY: add-headers +add-headers: +ifndef CHECK_HEADERS_DISABLED + @go get github.com/elastic/go-licenser + @go-licenser -license ${LICENSE} ${ES_BEATS}/${BEAT_NAME} + @go-licenser -license Elastic . +endif \ No newline at end of file diff --git a/x-pack/auditbeat/module/system/processes/processes.go b/x-pack/auditbeat/module/system/processes/processes.go index f4c7d3d74813..61711e77edde 100644 --- a/x-pack/auditbeat/module/system/processes/processes.go +++ b/x-pack/auditbeat/module/system/processes/processes.go @@ -7,13 +7,14 @@ package processes import ( "strconv" + "github.com/pkg/errors" + "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/cfgwarn" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/x-pack/auditbeat/cache" "github.com/elastic/beats/x-pack/auditbeat/module/system/config" "github.com/elastic/go-sysinfo/types" - "github.com/pkg/errors" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/metric/system/process" diff --git a/x-pack/auditbeat/tests/system/auditbeat_xpack.py b/x-pack/auditbeat/tests/system/auditbeat_xpack.py index 1941e12d97d5..eeb6a39b7d50 100644 --- a/x-pack/auditbeat/tests/system/auditbeat_xpack.py +++ b/x-pack/auditbeat/tests/system/auditbeat_xpack.py @@ -3,6 +3,7 @@ from auditbeat import BaseTest as AuditbeatTest + class AuditbeatXPackTest(AuditbeatTest): @classmethod @@ -13,7 +14,6 @@ def setUpClass(self): super(AuditbeatTest, self).setUpClass() - def setUp(self): super(AuditbeatTest, self).setUp() @@ -49,4 +49,4 @@ def check_metricset(self, module, metricset, fields=[], warnings_allowed=False): self.assertItemsEqual(self.de_dot(fields), evt.keys()) - self.assert_fields_are_documented(evt) \ No newline at end of file + self.assert_fields_are_documented(evt) diff --git a/x-pack/auditbeat/tests/system/test_metricsets.py b/x-pack/auditbeat/tests/system/test_metricsets.py index d1cd0f632577..58d06ce869f8 100644 --- a/x-pack/auditbeat/tests/system/test_metricsets.py +++ b/x-pack/auditbeat/tests/system/test_metricsets.py @@ -9,6 +9,7 @@ COMMON_FIELDS = ["@timestamp", "beat.version", "host.name", "event.module", "event.dataset"] + class Test(AuditbeatXPackTest): def test_metricset_host(self): @@ -17,7 +18,7 @@ def test_metricset_host(self): """ fields = ["system.host.uptime", "system.host.ip", "system.host.os.name"] - + # Metricset is experimental and that generates a warning, TODO: remove later self.check_metricset("system", "host", COMMON_FIELDS + fields, warnings_allowed=True) @@ -27,7 +28,7 @@ def test_metricset_packages(self): """ fields = ["system.packages.packages.package.name"] - + # Metricset is experimental and that generates a warning, TODO: remove later self.check_metricset("system", "packages", COMMON_FIELDS + fields, warnings_allowed=True) @@ -37,6 +38,6 @@ def test_metricset_processes(self): """ fields = ["system.processes.processes.process.name"] - + # Metricset is experimental and that generates a warning, TODO: remove later self.check_metricset("system", "processes", COMMON_FIELDS + fields, warnings_allowed=True) From 6085e6927a542ca2728596d91ea86f799dc1b234 Mon Sep 17 00:00:00 2001 From: Christoph Wurm Date: Tue, 25 Sep 2018 21:17:22 +0100 Subject: [PATCH 08/21] Formatting and making autopep work from xpack/. --- auditbeat/tests/system/auditbeat.py | 4 ++-- libbeat/scripts/Makefile | 2 +- x-pack/auditbeat/Makefile | 5 +++-- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/auditbeat/tests/system/auditbeat.py b/auditbeat/tests/system/auditbeat.py index 3b3dfd30354d..8892afbe1027 100644 --- a/auditbeat/tests/system/auditbeat.py +++ b/auditbeat/tests/system/auditbeat.py @@ -15,11 +15,11 @@ class BaseTest(MetricbeatTest): @classmethod def setUpClass(self): self.beat_name = "auditbeat" - + if not hasattr(self, 'beat_path'): self.beat_path = os.path.abspath( os.path.join(os.path.dirname(__file__), "../../")) - + super(MetricbeatTest, self).setUpClass() def create_file(self, path, contents): diff --git a/libbeat/scripts/Makefile b/libbeat/scripts/Makefile index ae7feff8c8ba..286e0b712065 100755 --- a/libbeat/scripts/Makefile +++ b/libbeat/scripts/Makefile @@ -80,7 +80,7 @@ PIP_INSTALL_PARAMS?= BUILDID?=$(shell git rev-parse HEAD) ## @Building The build ID VIRTUALENV_PARAMS?= INTEGRATION_TESTS?= -FIND=. ${PYTHON_ENV}/bin/activate; find . -type f -not -path "*/vendor/*" -not -path "*/build/*" -not -path "*/.git/*" +FIND?=. ${PYTHON_ENV}/bin/activate; find . -type f -not -path "*/vendor/*" -not -path "*/build/*" -not -path "*/.git/*" PERM_EXEC?=$(shell [ `uname -s` = "Darwin" ] && echo "+111" || echo "/a+x") ifeq ($(DOCKER_CACHE),0) diff --git a/x-pack/auditbeat/Makefile b/x-pack/auditbeat/Makefile index 1776815a0c51..00efca3d7e55 100644 --- a/x-pack/auditbeat/Makefile +++ b/x-pack/auditbeat/Makefile @@ -1,7 +1,8 @@ BEAT_NAME=auditbeat -ES_BEATS?=../.. +ES_BEATS=../.. XPACK_BEAT_PATH?=github.com/elastic/beats/x-pack/${BEAT_NAME} -GOPACKAGES?=$(shell go list ${BEAT_PATH}/... ${XPACK_BEAT_PATH}/... | grep -v /vendor/ | grep -v /scripts/cmd/ ) +GOPACKAGES=$(shell go list ${BEAT_PATH}/... ${XPACK_BEAT_PATH}/... | grep -v /vendor/ | grep -v /scripts/cmd/ ) +FIND=. ${PYTHON_ENV}/bin/activate; find . ${ES_BEATS}/${BEAT_NAME} -type f -not -path "*/vendor/*" -not -path "*/build/*" -not -path "*/.git/*" # Include main auditbeat Makefile include ${ES_BEATS}/${BEAT_NAME}/Makefile From 5ad0e5696cc2d4929a5725c9969da925a321bf10 Mon Sep 17 00:00:00 2001 From: Christoph Wurm Date: Wed, 26 Sep 2018 00:14:08 +0100 Subject: [PATCH 09/21] Fix formatting in metricbeat. --- metricbeat/tests/system/metricbeat.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/metricbeat/tests/system/metricbeat.py b/metricbeat/tests/system/metricbeat.py index ae53300c6534..dc80d7060656 100644 --- a/metricbeat/tests/system/metricbeat.py +++ b/metricbeat/tests/system/metricbeat.py @@ -21,10 +21,10 @@ class BaseTest(TestCase): def setUpClass(self): if not hasattr(self, 'beat_name'): self.beat_name = "metricbeat" - + if not hasattr(self, 'beat_path'): self.beat_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../")) - + super(BaseTest, self).setUpClass() def de_dot(self, existing_fields): From fcab22c351fa863d5576c18a4277926e647c9a37 Mon Sep 17 00:00:00 2001 From: Christoph Wurm Date: Thu, 27 Sep 2018 13:40:14 +0100 Subject: [PATCH 10/21] Improve error handling. --- x-pack/auditbeat/module/system/host/host.go | 4 +++- .../module/system/packages/packages.go | 1 + .../module/system/processes/processes.go | 24 ++++++++++++++----- 3 files changed, 22 insertions(+), 7 deletions(-) diff --git a/x-pack/auditbeat/module/system/host/host.go b/x-pack/auditbeat/module/system/host/host.go index 47f869bc2fcf..bd29af05f072 100644 --- a/x-pack/auditbeat/module/system/host/host.go +++ b/x-pack/auditbeat/module/system/host/host.go @@ -59,7 +59,9 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { func (ms *MetricSet) Fetch(report mb.ReporterV2) { host, err := sysinfo.Host() if err != nil { - report.Error(errors.Wrap(err, "Failed to load host information")) + errW := errors.Wrap(err, "Failed to load host information") + ms.log.Error(errW) + report.Error(errW) return } diff --git a/x-pack/auditbeat/module/system/packages/packages.go b/x-pack/auditbeat/module/system/packages/packages.go index 153a34444be6..6598b5a2e7f9 100644 --- a/x-pack/auditbeat/module/system/packages/packages.go +++ b/x-pack/auditbeat/module/system/packages/packages.go @@ -105,6 +105,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { func (ms *MetricSet) Fetch(report mb.ReporterV2) { packages, err := getPackages() if err != nil { + ms.log.Error(err) report.Error(err) } if packages == nil { diff --git a/x-pack/auditbeat/module/system/processes/processes.go b/x-pack/auditbeat/module/system/processes/processes.go index 61711e77edde..abf4c4b80c76 100644 --- a/x-pack/auditbeat/module/system/processes/processes.go +++ b/x-pack/auditbeat/module/system/processes/processes.go @@ -90,7 +90,16 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { // Fetch checks which processes are running on the host and reports them. // It is invoked periodically. func (ms *MetricSet) Fetch(report mb.ReporterV2) { - processInfos := ms.getProcessInfos() + processInfos, errorList := ms.getProcessInfos() + if len(errorList) != 0 { + for _, err := range errorList { + ms.log.Error(err) + report.Error(err) + } + } + if processInfos == nil { + return + } if ms.cache != nil && !ms.cache.IsEmpty() { started, stopped := ms.cache.DiffAndUpdateCache(processInfos) @@ -133,25 +142,28 @@ func (ms *MetricSet) Fetch(report mb.ReporterV2) { } } -func (ms *MetricSet) getProcessInfos() (processInfos []cache.Cacheable) { +func (ms *MetricSet) getProcessInfos() ([]cache.Cacheable, []error) { // TODO: Implement Processes() in go-sysinfo // e.g. https://github.com/elastic/go-sysinfo/blob/master/providers/darwin/process_darwin_amd64.go#L41 pids, err := process.Pids() if err != nil { - ms.log.Errorw("Failed to fetch the list of PIDs", "error", err) + return nil, []error{errors.Wrap(err, "Failed to fetch the list of PIDs")} } + var processInfos []cache.Cacheable + var errorList []error + for _, pid := range pids { if p, err := sysinfo.Process(pid); err == nil { if pInfo, err := p.Info(); err == nil { processInfos = append(processInfos, ProcessInfo{pInfo}) } else { - ms.log.Errorw("Failed to load process information", "error", err) + errorList = append(errorList, errors.Wrap(err, "Failed to load process information")) } } else { - ms.log.Errorw("Failed to load process", "error", err) + errorList = append(errorList, errors.Wrap(err, "Failed to load process")) } } - return + return processInfos, errorList } From e99c4db7f06b11d6636b16709930ca7ed97d85d6 Mon Sep 17 00:00:00 2001 From: Christoph Wurm Date: Thu, 27 Sep 2018 17:03:27 +0100 Subject: [PATCH 11/21] Skip processes system test on macOS if not root. --- x-pack/auditbeat/tests/system/test_metricsets.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/x-pack/auditbeat/tests/system/test_metricsets.py b/x-pack/auditbeat/tests/system/test_metricsets.py index 58d06ce869f8..e217cdbba362 100644 --- a/x-pack/auditbeat/tests/system/test_metricsets.py +++ b/x-pack/auditbeat/tests/system/test_metricsets.py @@ -2,6 +2,7 @@ import os import sys import time +import unittest sys.path.append(os.path.join(os.path.dirname(__file__), '../../../../auditbeat/tests/system')) @@ -32,6 +33,7 @@ def test_metricset_packages(self): # Metricset is experimental and that generates a warning, TODO: remove later self.check_metricset("system", "packages", COMMON_FIELDS + fields, warnings_allowed=True) + @unittest.skipIf(sys.platform == "darwin" and os.geteuid != 0, "Requires root on macOS") def test_metricset_processes(self): """ processes metricset collects information about processes running on a system. From 018828ce86e0e35365c340d5a40c9ed2d957dea1 Mon Sep 17 00:00:00 2001 From: Christoph Wurm Date: Fri, 28 Sep 2018 12:33:37 +0100 Subject: [PATCH 12/21] Change to {processes|packages}.report_changes. --- x-pack/auditbeat/module/system/host/host.go | 17 ++--------------- .../system/{config => packages}/config.go | 11 ++++------- .../module/system/packages/packages.go | 5 ++--- .../module/system/processes/config.go | 19 +++++++++++++++++++ .../module/system/processes/processes.go | 5 ++--- 5 files changed, 29 insertions(+), 28 deletions(-) rename x-pack/auditbeat/module/system/{config => packages}/config.go (67%) create mode 100644 x-pack/auditbeat/module/system/processes/config.go diff --git a/x-pack/auditbeat/module/system/host/host.go b/x-pack/auditbeat/module/system/host/host.go index bd29af05f072..242fdcfa5014 100644 --- a/x-pack/auditbeat/module/system/host/host.go +++ b/x-pack/auditbeat/module/system/host/host.go @@ -10,7 +10,6 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/cfgwarn" "github.com/elastic/beats/metricbeat/mb" - "github.com/elastic/beats/x-pack/auditbeat/module/system/config" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/go-sysinfo" @@ -37,22 +36,10 @@ type MetricSet struct { func New(base mb.BaseMetricSet) (mb.MetricSet, error) { cfgwarn.Experimental("The %v/%v dataset is experimental", moduleName, metricsetName) - config := config.NewDefaultConfig() - if err := base.Module().UnpackConfig(&config); err != nil { - return nil, errors.Wrapf(err, "failed to unpack the %v/%v config", moduleName, metricsetName) - } - - ms := &MetricSet{ + return &MetricSet{ BaseMetricSet: base, log: logp.NewLogger(moduleName), - } - - if config.ReportChanges { - // TODO: Implement reporting changes? - ms.log.Warnw("Metricset %v/%v does not support report_changes", moduleName, metricsetName) - } - - return ms, nil + }, nil } // Fetch collects data about the host. It is invoked periodically. diff --git a/x-pack/auditbeat/module/system/config/config.go b/x-pack/auditbeat/module/system/packages/config.go similarity index 67% rename from x-pack/auditbeat/module/system/config/config.go rename to x-pack/auditbeat/module/system/packages/config.go index 7a120607c0df..05fdce2fcb16 100644 --- a/x-pack/auditbeat/module/system/config/config.go +++ b/x-pack/auditbeat/module/system/packages/config.go @@ -2,11 +2,11 @@ // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. -package config +package packages // Config defines the host metricset's configuration options. type Config struct { - ReportChanges bool `config:"report_changes"` + ReportChanges bool `config:"packages.report_changes"` } // Validate validates the host metricset config. @@ -14,9 +14,6 @@ func (c *Config) Validate() error { return nil } -// NewDefaultConfig returns a default configuration for this module. -func NewDefaultConfig() Config { - return Config{ - ReportChanges: true, - } +var defaultConfig = Config{ + ReportChanges: true, } diff --git a/x-pack/auditbeat/module/system/packages/packages.go b/x-pack/auditbeat/module/system/packages/packages.go index 6598b5a2e7f9..622f5e590dbf 100644 --- a/x-pack/auditbeat/module/system/packages/packages.go +++ b/x-pack/auditbeat/module/system/packages/packages.go @@ -21,7 +21,6 @@ import ( "github.com/elastic/beats/libbeat/common/cfgwarn" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/x-pack/auditbeat/cache" - "github.com/elastic/beats/x-pack/auditbeat/module/system/config" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/go-sysinfo" @@ -41,7 +40,7 @@ func init() { // MetricSet collects data about the host. type MetricSet struct { mb.BaseMetricSet - config config.Config + config Config cache *cache.Cache log *logp.Logger } @@ -83,7 +82,7 @@ func (pkg Package) toMapStr() common.MapStr { func New(base mb.BaseMetricSet) (mb.MetricSet, error) { cfgwarn.Experimental("The %v/%v dataset is experimental", moduleName, metricsetName) - config := config.NewDefaultConfig() + config := defaultConfig if err := base.Module().UnpackConfig(&config); err != nil { return nil, errors.Wrapf(err, "failed to unpack the %v/%v config", moduleName, metricsetName) } diff --git a/x-pack/auditbeat/module/system/processes/config.go b/x-pack/auditbeat/module/system/processes/config.go new file mode 100644 index 000000000000..4eff02a211ef --- /dev/null +++ b/x-pack/auditbeat/module/system/processes/config.go @@ -0,0 +1,19 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package processes + +// Config defines the host metricset's configuration options. +type Config struct { + ReportChanges bool `config:"processes.report_changes"` +} + +// Validate validates the host metricset config. +func (c *Config) Validate() error { + return nil +} + +var defaultConfig = Config{ + ReportChanges: true, +} diff --git a/x-pack/auditbeat/module/system/processes/processes.go b/x-pack/auditbeat/module/system/processes/processes.go index abf4c4b80c76..8b57fc5e761c 100644 --- a/x-pack/auditbeat/module/system/processes/processes.go +++ b/x-pack/auditbeat/module/system/processes/processes.go @@ -13,7 +13,6 @@ import ( "github.com/elastic/beats/libbeat/common/cfgwarn" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/x-pack/auditbeat/cache" - "github.com/elastic/beats/x-pack/auditbeat/module/system/config" "github.com/elastic/go-sysinfo/types" "github.com/elastic/beats/libbeat/logp" @@ -35,7 +34,7 @@ func init() { // MetricSet collects data about the host. type MetricSet struct { mb.BaseMetricSet - config config.Config + config Config cache *cache.Cache log *logp.Logger } @@ -69,7 +68,7 @@ func (pInfo ProcessInfo) toMapStr() common.MapStr { func New(base mb.BaseMetricSet) (mb.MetricSet, error) { cfgwarn.Experimental("The %v/%v dataset is experimental", moduleName, metricsetName) - config := config.NewDefaultConfig() + config := defaultConfig if err := base.Module().UnpackConfig(&config); err != nil { return nil, errors.Wrapf(err, "failed to unpack the %v/%v config", moduleName, metricsetName) } From 05b8ab4e813e4b9470a770049fd98e536b49c140 Mon Sep 17 00:00:00 2001 From: Christoph Wurm Date: Fri, 28 Sep 2018 14:18:20 +0100 Subject: [PATCH 13/21] Change to xxhash. --- x-pack/auditbeat/cache/cache.go | 6 +++--- x-pack/auditbeat/cache/cache_test.go | 7 +++++-- x-pack/auditbeat/module/system/packages/packages.go | 10 +++++++--- x-pack/auditbeat/module/system/processes/processes.go | 9 ++++++--- 4 files changed, 21 insertions(+), 11 deletions(-) diff --git a/x-pack/auditbeat/cache/cache.go b/x-pack/auditbeat/cache/cache.go index b63fcdface4d..ad3bbc8926ef 100644 --- a/x-pack/auditbeat/cache/cache.go +++ b/x-pack/auditbeat/cache/cache.go @@ -6,18 +6,18 @@ package cache // Cache is just a map being used as a cache. type Cache struct { - hashMap map[string]Cacheable + hashMap map[uint64]Cacheable } // Cacheable is the interface items stored in Cache need to implement. type Cacheable interface { - Hash() string + Hash() uint64 } // New creates a new cache. func New() *Cache { return &Cache{ - hashMap: make(map[string]Cacheable), + hashMap: make(map[uint64]Cacheable), } } diff --git a/x-pack/auditbeat/cache/cache_test.go b/x-pack/auditbeat/cache/cache_test.go index 7dc12dd404b7..5e921a134b58 100644 --- a/x-pack/auditbeat/cache/cache_test.go +++ b/x-pack/auditbeat/cache/cache_test.go @@ -7,6 +7,7 @@ package cache import ( "testing" + "github.com/OneOfOne/xxhash" "github.com/stretchr/testify/assert" ) @@ -14,8 +15,10 @@ type CacheTestItem struct { s string } -func (item CacheTestItem) Hash() string { - return item.s + item.s +func (item CacheTestItem) Hash() uint64 { + h := xxhash.New64() + h.WriteString(item.s) + return h.Sum64() } func TestCache(t *testing.T) { diff --git a/x-pack/auditbeat/module/system/packages/packages.go b/x-pack/auditbeat/module/system/packages/packages.go index 622f5e590dbf..028c68b4c820 100644 --- a/x-pack/auditbeat/module/system/packages/packages.go +++ b/x-pack/auditbeat/module/system/packages/packages.go @@ -22,6 +22,8 @@ import ( "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/x-pack/auditbeat/cache" + "github.com/OneOfOne/xxhash" + "github.com/elastic/beats/libbeat/logp" "github.com/elastic/go-sysinfo" ) @@ -59,9 +61,11 @@ type Package struct { } // Hash creates a hash for Package. -func (pkg Package) Hash() string { - // Could use real hash e.g. FNV if there is an advantage - return pkg.Name + pkg.InstallTime.String() +func (pkg Package) Hash() uint64 { + h := xxhash.New64() + h.WriteString(pkg.Name) + h.WriteString(pkg.InstallTime.String()) + return h.Sum64() } func (pkg Package) toMapStr() common.MapStr { diff --git a/x-pack/auditbeat/module/system/processes/processes.go b/x-pack/auditbeat/module/system/processes/processes.go index 8b57fc5e761c..87e2450ceb75 100644 --- a/x-pack/auditbeat/module/system/processes/processes.go +++ b/x-pack/auditbeat/module/system/processes/processes.go @@ -7,6 +7,7 @@ package processes import ( "strconv" + "github.com/OneOfOne/xxhash" "github.com/pkg/errors" "github.com/elastic/beats/libbeat/common" @@ -45,9 +46,11 @@ type ProcessInfo struct { } // Hash creates a hash for ProcessInfo. -func (pInfo ProcessInfo) Hash() string { - // Could use real hash e.g. FNV if there is an advantage - return strconv.Itoa(pInfo.PID) + pInfo.StartTime.String() +func (pInfo ProcessInfo) Hash() uint64 { + h := xxhash.New64() + h.WriteString(strconv.Itoa(pInfo.PID)) + h.WriteString(pInfo.StartTime.String()) + return h.Sum64() } func (pInfo ProcessInfo) toMapStr() common.MapStr { From ecfe35430f0783754978c9581aa0a02419e8a427 Mon Sep 17 00:00:00 2001 From: Christoph Wurm Date: Fri, 28 Sep 2018 16:10:59 +0100 Subject: [PATCH 14/21] Various changes to packages. --- .../module/system/packages/packages.go | 153 ++++++++++-------- 1 file changed, 85 insertions(+), 68 deletions(-) diff --git a/x-pack/auditbeat/module/system/packages/packages.go b/x-pack/auditbeat/module/system/packages/packages.go index 028c68b4c820..f4ce8f04d2c7 100644 --- a/x-pack/auditbeat/module/system/packages/packages.go +++ b/x-pack/auditbeat/module/system/packages/packages.go @@ -21,6 +21,7 @@ import ( "github.com/elastic/beats/libbeat/common/cfgwarn" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/x-pack/auditbeat/cache" + "github.com/elastic/go-sysinfo/types" "github.com/OneOfOne/xxhash" @@ -31,6 +32,10 @@ import ( const ( moduleName = "system" metricsetName = "packages" + + redhat = "redhat" + debian = "debian" + darwin = "darwin" ) func init() { @@ -42,9 +47,10 @@ func init() { // MetricSet collects data about the host. type MetricSet struct { mb.BaseMetricSet - config Config - cache *cache.Cache - log *logp.Logger + config Config + osFamily string + cache *cache.Cache + log *logp.Logger } // Package represents information for a package. @@ -70,18 +76,32 @@ func (pkg Package) Hash() uint64 { func (pkg Package) toMapStr() common.MapStr { return common.MapStr{ - "package.name": pkg.Name, - "package.version": pkg.Version, - "package.release": pkg.Release, - "package.arch": pkg.Arch, - "package.license": pkg.License, - "package.installtime": pkg.InstallTime, - "package.size": pkg.Size, - "package.summary": pkg.Summary, - "package.url": pkg.URL, + "name": pkg.Name, + "version": pkg.Version, + "release": pkg.Release, + "arch": pkg.Arch, + "license": pkg.License, + "installtime": pkg.InstallTime, + "size": pkg.Size, + "summary": pkg.Summary, + "url": pkg.URL, } } +func getOS() (*types.OSInfo, error) { + host, err := sysinfo.Host() + if err != nil { + return nil, errors.Wrap(err, "error getting the OS") + } + + hostInfo := host.Info() + if hostInfo.OS == nil { + return nil, errors.New("no host info") + } + + return hostInfo.OS, nil +} + // New constructs a new MetricSet. func New(base mb.BaseMetricSet) (mb.MetricSet, error) { cfgwarn.Experimental("The %v/%v dataset is experimental", moduleName, metricsetName) @@ -97,6 +117,17 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { log: logp.NewLogger(moduleName), } + if os, err := getOS(); err == nil { + switch os.Family { + case redhat, debian, darwin: + ms.osFamily = os.Family + default: + return nil, fmt.Errorf("this metricset does not support OS family %v", os.Family) + } + } else if err != nil { + return nil, err + } + if config.ReportChanges { ms.cache = cache.New() } @@ -106,12 +137,10 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { // Fetch collects data about the host. It is invoked periodically. func (ms *MetricSet) Fetch(report mb.ReporterV2) { - packages, err := getPackages() + packages, err := getPackages(ms.osFamily) if err != nil { ms.log.Error(err) report.Error(err) - } - if packages == nil { return } @@ -121,8 +150,10 @@ func (ms *MetricSet) Fetch(report mb.ReporterV2) { for _, pkgInfo := range installed { report.Event(mb.Event{ MetricSetFields: common.MapStr{ - "status": "installed", - "packages": pkgInfo.(Package).toMapStr(), + "status": "installed", + "packages": common.MapStr{ + "package": pkgInfo.(Package).toMapStr(), + }, }, }) } @@ -130,8 +161,10 @@ func (ms *MetricSet) Fetch(report mb.ReporterV2) { for _, pkgInfo := range removed { report.Event(mb.Event{ MetricSetFields: common.MapStr{ - "status": "removed", - "packages": pkgInfo.(Package).toMapStr(), + "status": "removed", + "packages": common.MapStr{ + "package": pkgInfo.(Package).toMapStr(), + }, }, }) } @@ -140,7 +173,9 @@ func (ms *MetricSet) Fetch(report mb.ReporterV2) { var pkgInfos []common.MapStr for _, pkgInfo := range packages { - pkgInfos = append(pkgInfos, pkgInfo.(Package).toMapStr()) + pkgInfos = append(pkgInfos, common.MapStr{ + "package": pkgInfo.(Package).toMapStr(), + }) } report.Event(mb.Event{ @@ -156,61 +191,49 @@ func (ms *MetricSet) Fetch(report mb.ReporterV2) { } } -func getPackages() ([]cache.Cacheable, error) { - host, err := sysinfo.Host() - if err != nil { - return nil, errors.Wrap(err, "Error getting the OS") - } - - hostInfo := host.Info() - if hostInfo.OS == nil { - return nil, errors.New("No host info") - } - - var packages []cache.Cacheable - - switch hostInfo.OS.Family { - case "redhat": +func getPackages(osFamily string) (packages []cache.Cacheable, err error) { + switch osFamily { + case redhat: packages, err = listRPMPackages() if err != nil { - err = errors.Wrap(err, "Error getting RPM packages") + err = errors.Wrap(err, "error getting RPM packages") } - case "debian": + case debian: packages, err = listDebPackages() if err != nil { - err = errors.Wrap(err, "Error getting DEB packages") + err = errors.Wrap(err, "error getting DEB packages") } - case "darwin": + case darwin: packages, err = listBrewPackages() if err != nil { - err = errors.Wrap(err, "Error getting Homebrew packages") + err = errors.Wrap(err, "error getting Homebrew packages") } default: - return nil, fmt.Errorf("No logic for getting packages for OS family %v", hostInfo.OS.Family) + panic("unknown OS - this should not have happened") } - return packages, err + return } /* The following functions copied from https://github.com/tsg/listpackages/blob/master/main.go */ func listRPMPackages() ([]cache.Cacheable, error) { - format := "%{NAME}|%{VERSION}|%{RELEASE}|%{ARCH}|%{LICENSE}|%{INSTALLTIME}|%{SIZE}|%{URL}|%{SUMMARY}\\n" + const format = "%{NAME}|%{VERSION}|%{RELEASE}|%{ARCH}|%{LICENSE}|%{INSTALLTIME}|%{SIZE}|%{URL}|%{SUMMARY}\\n" out, err := exec.Command("/usr/bin/rpm", "--qf", format, "-qa").Output() if err != nil { - return nil, fmt.Errorf("Error running rpm -qa command: %v", err) + return nil, errors.Wrapf(err, "error running rpm -qa command - output: %v", out) } lines := strings.Split(string(out), "\n") - packages := []cache.Cacheable{} + var packages []cache.Cacheable for _, line := range lines { if len(strings.TrimSpace(line)) == 0 { continue } words := strings.SplitN(line, "|", 9) if len(words) < 9 { - return nil, fmt.Errorf("Line '%s' doesn't have enough elements", line) + return nil, fmt.Errorf("line '%s' doesn't have enough elements", line) } pkg := Package{ Name: words[0], @@ -225,31 +248,30 @@ func listRPMPackages() ([]cache.Cacheable, error) { } ts, err := strconv.ParseInt(words[5], 10, 64) if err != nil { - return nil, fmt.Errorf("Error converting %s to string: %v", words[5], err) + return nil, errors.Wrapf(err, "error converting %s to string", words[5]) } pkg.InstallTime = time.Unix(ts, 0) pkg.Size, err = strconv.ParseUint(words[6], 10, 64) if err != nil { - return nil, fmt.Errorf("Error converting %s to string: %v", words[6], err) + return nil, errors.Wrapf(err, "error converting %s to string", words[6]) } packages = append(packages, pkg) - } return packages, nil } func listDebPackages() ([]cache.Cacheable, error) { - statusFile := "/var/lib/dpkg/status" + const statusFile = "/var/lib/dpkg/status" file, err := os.Open(statusFile) if err != nil { - return nil, fmt.Errorf("Error opening '%s': %v", statusFile, err) + return nil, errors.Wrapf(err, "error opening '%s'", statusFile) } defer file.Close() - packages := []cache.Cacheable{} + var packages []cache.Cacheable pkg := &Package{} scanner := bufio.NewScanner(file) for scanner.Scan() { @@ -266,7 +288,7 @@ func listDebPackages() ([]cache.Cacheable, error) { } words := strings.SplitN(line, ":", 2) if len(words) != 2 { - return nil, fmt.Errorf("The following line was unexpected (no ':' found): '%s'", line) + return nil, fmt.Errorf("the following line was unexpected (no ':' found): '%s'", line) } value := strings.TrimSpace(words[1]) switch strings.ToLower(words[0]) { @@ -281,35 +303,29 @@ func listDebPackages() ([]cache.Cacheable, error) { case "installed-size": pkg.Size, err = strconv.ParseUint(value, 10, 64) if err != nil { - return nil, fmt.Errorf("Error converting %s to int: %v", value, err) + return nil, errors.Wrapf(err, "error converting %s to int", value) } default: continue } } if err = scanner.Err(); err != nil { - return nil, fmt.Errorf("Error scanning file: %v", err) + return nil, errors.Wrap(err, "error scanning file") } return packages, nil } func listBrewPackages() ([]cache.Cacheable, error) { - cellarPath := "/usr/local/Cellar" - - cellarInfo, err := os.Stat(cellarPath) - if err != nil { - return nil, fmt.Errorf("Homebrew cellar not found in %s: %v", cellarPath, err) - } - if !cellarInfo.IsDir() { - return nil, fmt.Errorf("%s is not a directory", cellarPath) - } + const cellarPath = "/usr/local/Cellar" packageDirs, err := ioutil.ReadDir(cellarPath) - if err != nil { - return nil, fmt.Errorf("Error reading directory %s: %v", cellarPath, err) + if os.IsNotExist(err) { + return nil, errors.Wrapf(err, "%s does not exist - is Homebrew installed?", cellarPath) + } else if err != nil { + return nil, errors.Wrapf(err, "error reading directory %s", cellarPath) } - packages := []cache.Cacheable{} + var packages []cache.Cacheable for _, packageDir := range packageDirs { if !packageDir.IsDir() { continue @@ -317,8 +333,9 @@ func listBrewPackages() ([]cache.Cacheable, error) { pkgPath := path.Join(cellarPath, packageDir.Name()) versions, err := ioutil.ReadDir(pkgPath) if err != nil { - return nil, fmt.Errorf("Error reading directory: %s: %v", pkgPath, err) + return nil, errors.Wrapf(err, "error reading directory: %s", pkgPath) } + for _, version := range versions { if !version.IsDir() { continue From e9452ff5fb89dc596941710cfbed260930c2c259 Mon Sep 17 00:00:00 2001 From: Christoph Wurm Date: Mon, 1 Oct 2018 11:50:24 +0100 Subject: [PATCH 15/21] Use *{Package|ProcessInfo} instead of Cacheable. --- .../module/system/packages/packages.go | 36 ++++++++++++------- .../module/system/processes/processes.go | 22 ++++++++---- 2 files changed, 39 insertions(+), 19 deletions(-) diff --git a/x-pack/auditbeat/module/system/packages/packages.go b/x-pack/auditbeat/module/system/packages/packages.go index f4ce8f04d2c7..f0ce01e0d087 100644 --- a/x-pack/auditbeat/module/system/packages/packages.go +++ b/x-pack/auditbeat/module/system/packages/packages.go @@ -145,7 +145,7 @@ func (ms *MetricSet) Fetch(report mb.ReporterV2) { } if ms.cache != nil && !ms.cache.IsEmpty() { - installed, removed := ms.cache.DiffAndUpdateCache(packages) + installed, removed := ms.cache.DiffAndUpdateCache(convertToCacheable(packages)) for _, pkgInfo := range installed { report.Event(mb.Event{ @@ -174,7 +174,7 @@ func (ms *MetricSet) Fetch(report mb.ReporterV2) { for _, pkgInfo := range packages { pkgInfos = append(pkgInfos, common.MapStr{ - "package": pkgInfo.(Package).toMapStr(), + "package": pkgInfo.toMapStr(), }) } @@ -186,12 +186,22 @@ func (ms *MetricSet) Fetch(report mb.ReporterV2) { if ms.cache != nil { // This will initialize the cache with the current packages - ms.cache.DiffAndUpdateCache(packages) + ms.cache.DiffAndUpdateCache(convertToCacheable(packages)) } } } -func getPackages(osFamily string) (packages []cache.Cacheable, err error) { +func convertToCacheable(packages []*Package) []cache.Cacheable { + c := make([]cache.Cacheable, 0, len(packages)) + + for _, p := range packages { + c = append(c, p) + } + + return c +} + +func getPackages(osFamily string) (packages []*Package, err error) { switch osFamily { case redhat: packages, err = listRPMPackages() @@ -218,7 +228,7 @@ func getPackages(osFamily string) (packages []cache.Cacheable, err error) { /* The following functions copied from https://github.com/tsg/listpackages/blob/master/main.go */ -func listRPMPackages() ([]cache.Cacheable, error) { +func listRPMPackages() ([]*Package, error) { const format = "%{NAME}|%{VERSION}|%{RELEASE}|%{ARCH}|%{LICENSE}|%{INSTALLTIME}|%{SIZE}|%{URL}|%{SUMMARY}\\n" out, err := exec.Command("/usr/bin/rpm", "--qf", format, "-qa").Output() if err != nil { @@ -226,7 +236,7 @@ func listRPMPackages() ([]cache.Cacheable, error) { } lines := strings.Split(string(out), "\n") - var packages []cache.Cacheable + var packages []*Package for _, line := range lines { if len(strings.TrimSpace(line)) == 0 { continue @@ -235,7 +245,7 @@ func listRPMPackages() ([]cache.Cacheable, error) { if len(words) < 9 { return nil, fmt.Errorf("line '%s' doesn't have enough elements", line) } - pkg := Package{ + pkg := &Package{ Name: words[0], Version: words[1], Release: words[2], @@ -263,7 +273,7 @@ func listRPMPackages() ([]cache.Cacheable, error) { return packages, nil } -func listDebPackages() ([]cache.Cacheable, error) { +func listDebPackages() ([]*Package, error) { const statusFile = "/var/lib/dpkg/status" file, err := os.Open(statusFile) if err != nil { @@ -271,14 +281,14 @@ func listDebPackages() ([]cache.Cacheable, error) { } defer file.Close() - var packages []cache.Cacheable + var packages []*Package pkg := &Package{} scanner := bufio.NewScanner(file) for scanner.Scan() { line := scanner.Text() if len(strings.TrimSpace(line)) == 0 { // empty line signals new package - packages = append(packages, *pkg) + packages = append(packages, pkg) pkg = &Package{} continue } @@ -315,7 +325,7 @@ func listDebPackages() ([]cache.Cacheable, error) { return packages, nil } -func listBrewPackages() ([]cache.Cacheable, error) { +func listBrewPackages() ([]*Package, error) { const cellarPath = "/usr/local/Cellar" packageDirs, err := ioutil.ReadDir(cellarPath) @@ -325,7 +335,7 @@ func listBrewPackages() ([]cache.Cacheable, error) { return nil, errors.Wrapf(err, "error reading directory %s", cellarPath) } - var packages []cache.Cacheable + var packages []*Package for _, packageDir := range packageDirs { if !packageDir.IsDir() { continue @@ -340,7 +350,7 @@ func listBrewPackages() ([]cache.Cacheable, error) { if !version.IsDir() { continue } - pkg := Package{ + pkg := &Package{ Name: packageDir.Name(), Version: version.Name(), InstallTime: version.ModTime(), diff --git a/x-pack/auditbeat/module/system/processes/processes.go b/x-pack/auditbeat/module/system/processes/processes.go index 87e2450ceb75..e440c04141ff 100644 --- a/x-pack/auditbeat/module/system/processes/processes.go +++ b/x-pack/auditbeat/module/system/processes/processes.go @@ -104,7 +104,7 @@ func (ms *MetricSet) Fetch(report mb.ReporterV2) { } if ms.cache != nil && !ms.cache.IsEmpty() { - started, stopped := ms.cache.DiffAndUpdateCache(processInfos) + started, stopped := ms.cache.DiffAndUpdateCache(convertToCacheable(processInfos)) for _, pInfo := range started { report.Event(mb.Event{ @@ -128,7 +128,7 @@ func (ms *MetricSet) Fetch(report mb.ReporterV2) { var processEvents []common.MapStr for _, pInfo := range processInfos { - processEvents = append(processEvents, pInfo.(ProcessInfo).toMapStr()) + processEvents = append(processEvents, pInfo.toMapStr()) } report.Event(mb.Event{ @@ -139,12 +139,22 @@ func (ms *MetricSet) Fetch(report mb.ReporterV2) { if ms.cache != nil { // This will initialize the cache with the current processes - ms.cache.DiffAndUpdateCache(processInfos) + ms.cache.DiffAndUpdateCache(convertToCacheable(processInfos)) } } } -func (ms *MetricSet) getProcessInfos() ([]cache.Cacheable, []error) { +func convertToCacheable(processInfos []*ProcessInfo) []cache.Cacheable { + c := make([]cache.Cacheable, 0, len(processInfos)) + + for _, p := range processInfos { + c = append(c, p) + } + + return c +} + +func (ms *MetricSet) getProcessInfos() ([]*ProcessInfo, []error) { // TODO: Implement Processes() in go-sysinfo // e.g. https://github.com/elastic/go-sysinfo/blob/master/providers/darwin/process_darwin_amd64.go#L41 pids, err := process.Pids() @@ -152,13 +162,13 @@ func (ms *MetricSet) getProcessInfos() ([]cache.Cacheable, []error) { return nil, []error{errors.Wrap(err, "Failed to fetch the list of PIDs")} } - var processInfos []cache.Cacheable + var processInfos []*ProcessInfo var errorList []error for _, pid := range pids { if p, err := sysinfo.Process(pid); err == nil { if pInfo, err := p.Info(); err == nil { - processInfos = append(processInfos, ProcessInfo{pInfo}) + processInfos = append(processInfos, &ProcessInfo{pInfo}) } else { errorList = append(errorList, errors.Wrap(err, "Failed to load process information")) } From 681cc68ac7a956b12b371b84c05ef3c1a8c8b133 Mon Sep 17 00:00:00 2001 From: Christoph Wurm Date: Mon, 1 Oct 2018 11:57:31 +0100 Subject: [PATCH 16/21] Add .gitignore in x-pack/auditbeat/. --- x-pack/auditbeat/.gitignore | 4 ++++ 1 file changed, 4 insertions(+) create mode 100644 x-pack/auditbeat/.gitignore diff --git a/x-pack/auditbeat/.gitignore b/x-pack/auditbeat/.gitignore new file mode 100644 index 000000000000..1a3313209869 --- /dev/null +++ b/x-pack/auditbeat/.gitignore @@ -0,0 +1,4 @@ +/auditbeat +/auditbeat.test +/data +/fields.yml From 973241ca6c8810e7a7f7cff5d5c7f80b1f5416f1 Mon Sep 17 00:00:00 2001 From: Christoph Wurm Date: Mon, 1 Oct 2018 13:44:28 +0100 Subject: [PATCH 17/21] Collect information about network interfaces. --- .../module/system/host/_meta/fields.yml | 84 +++++++++++++------ x-pack/auditbeat/module/system/host/host.go | 76 ++++++++++++++++- 2 files changed, 131 insertions(+), 29 deletions(-) diff --git a/x-pack/auditbeat/module/system/host/_meta/fields.yml b/x-pack/auditbeat/module/system/host/_meta/fields.yml index 99be5222c3d6..9729e10e6df6 100644 --- a/x-pack/auditbeat/module/system/host/_meta/fields.yml +++ b/x-pack/auditbeat/module/system/host/_meta/fields.yml @@ -32,36 +32,66 @@ type: keyword description: > Host ID. - - name: ip - type: ip - description: > - List of IP addresses on this host. - - name: mac - type: keyword - description: > - List of MAC addresses on this host. - name: architecture type: keyword description: > Host architecture (e.g. x86_64). - - name: os.platform - type: keyword - description: > - OS platform (e.g. centos, ubuntu, windows). - - name: os.name - type: keyword + - name: os + type: group description: > - OS name (e.g. Mac OS X). - - name: os.family - type: keyword - description: > - OS family (e.g. redhat, debian, freebsd, windows). - - name: os.version - type: keyword - description: > - OS version. - - name: os.kernel - type: keyword + `os` contains information about the operating system. + fields: + - name: platform + type: keyword + description: > + OS platform (e.g. centos, ubuntu, windows). + - name: name + type: keyword + description: > + OS name (e.g. Mac OS X). + - name: family + type: keyword + description: > + OS family (e.g. redhat, debian, freebsd, windows). + - name: version + type: keyword + description: > + OS version. + - name: kernel + type: keyword + description: > + The operating system's kernel version. + - name: network + type: group description: > - The operating system's kernel version. - + `network` contains network information from the system. + fields: + - name: interfaces + type: array + description: > + `interfaces` contains information about network interfaces. + fields: + - name: index + type: integer + description: > + Index of the interface. + - name: mtu + type: integer + description: > + Maximum transmission unit. + - name: name + type: keyword + description: > + Interface name. + - name: mac + type: keyword + description: > + MAC address. + - name: flags + type: text + description: > + Interface flags. + - name: ips + type: ip + description: > + IP addresses. diff --git a/x-pack/auditbeat/module/system/host/host.go b/x-pack/auditbeat/module/system/host/host.go index 242fdcfa5014..dc7c6d270386 100644 --- a/x-pack/auditbeat/module/system/host/host.go +++ b/x-pack/auditbeat/module/system/host/host.go @@ -5,6 +5,8 @@ package host import ( + "net" + "github.com/pkg/errors" "github.com/elastic/beats/libbeat/common" @@ -52,6 +54,19 @@ func (ms *MetricSet) Fetch(report mb.ReporterV2) { return } + networkInterfaces, err := getNetworkInterfaces() + if err != nil { + errW := errors.Wrap(err, "Failed to load network interface information") + ms.log.Error(errW) + report.Error(errW) + return + } + + var networkInterfaceMapStr []common.MapStr + for _, ifc := range networkInterfaces { + networkInterfaceMapStr = append(networkInterfaceMapStr, ifc.toMapStr()) + } + report.Event(mb.Event{ MetricSetFields: common.MapStr{ // https://github.com/elastic/ecs#-host-fields @@ -62,8 +77,6 @@ func (ms *MetricSet) Fetch(report mb.ReporterV2) { "timezone.offset.sec": host.Info().TimezoneOffsetSec, "name": host.Info().Hostname, "id": host.Info().UniqueID, - "ip": host.Info().IPs, - "mac": host.Info().MACs, // TODO "host.type": ? "architecture": host.Info().Architecture, @@ -75,6 +88,65 @@ func (ms *MetricSet) Fetch(report mb.ReporterV2) { "version": host.Info().OS.Version, "kernel": host.Info().KernelVersion, }, + + "network": common.MapStr{ + "interfaces": networkInterfaceMapStr, + }, }, }) } + +type NetworkInterface struct { + net.Interface + + ips []net.IP +} + +func (ifc NetworkInterface) toMapStr() common.MapStr { + return common.MapStr{ + "index": ifc.Index, + "mtu": ifc.MTU, + "name": ifc.Name, + "mac": ifc.HardwareAddr.String(), + "flags": ifc.Flags.String(), + "ips": ifc.ips, + } +} + +// getInterfaces fetches information about the system's network interfaces. +// TODO: Move to go-sysinfo? +func getNetworkInterfaces() ([]NetworkInterface, error) { + ifcs, err := net.Interfaces() + if err != nil { + return nil, err + } + + var networkInterfaces []NetworkInterface + + for _, ifc := range ifcs { + addrs, err := ifc.Addrs() + if err != nil { + return nil, err + } + + var ips []net.IP + for _, addr := range addrs { + ip, _, err := net.ParseCIDR(addr.String()) + if err != nil { + return nil, err + } + + ips = append(ips, ip) + } + + isLoopback := ifc.Flags&net.FlagLoopback != 0 + if !isLoopback { + networkInterfaces = append(networkInterfaces, NetworkInterface{ + ifc, + ips, + }) + } + } + + return networkInterfaces, nil +} From 92e1d7208ffa39f6e8d7bba0c732ee80ec333fbe Mon Sep 17 00:00:00 2001 From: Christoph Wurm Date: Tue, 2 Oct 2018 10:57:29 +0100 Subject: [PATCH 18/21] Add TestData functions, move some fields around. --- .../module/system/host/_meta/data.json | 46 +++++++++++++++++++ .../module/system/host/_meta/fields.yml | 2 +- x-pack/auditbeat/module/system/host/host.go | 2 +- .../auditbeat/module/system/host/host_test.go | 26 +++++++++++ .../module/system/packages/_meta/data.json | 30 ++++++++++++ .../module/system/packages/_meta/fields.yml | 12 ++--- .../module/system/packages/packages.go | 25 +++++----- .../module/system/packages/packages_test.go | 26 +++++++++++ .../module/system/processes/_meta/data.json | 30 ++++++++++++ .../module/system/processes/_meta/fields.yml | 14 +++--- .../module/system/processes/processes.go | 34 ++++++++------ .../module/system/processes/processes_test.go | 26 +++++++++++ .../auditbeat/tests/system/auditbeat_xpack.py | 5 +- .../auditbeat/tests/system/test_metricsets.py | 6 +-- 14 files changed, 239 insertions(+), 45 deletions(-) create mode 100644 x-pack/auditbeat/module/system/host/_meta/data.json create mode 100644 x-pack/auditbeat/module/system/host/host_test.go create mode 100644 x-pack/auditbeat/module/system/packages/_meta/data.json create mode 100644 x-pack/auditbeat/module/system/packages/packages_test.go create mode 100644 x-pack/auditbeat/module/system/processes/_meta/data.json create mode 100644 x-pack/auditbeat/module/system/processes/processes_test.go diff --git a/x-pack/auditbeat/module/system/host/_meta/data.json b/x-pack/auditbeat/module/system/host/_meta/data.json new file mode 100644 index 000000000000..0cd55dd04b46 --- /dev/null +++ b/x-pack/auditbeat/module/system/host/_meta/data.json @@ -0,0 +1,46 @@ +{ + "@timestamp": "2017-10-12T08:05:34.853Z", + "beat": { + "hostname": "host.example.com", + "name": "host.example.com" + }, + "metricset": { + "module": "system", + "name": "host", + "rtt": 115 + }, + "system": { + "host": { + "architecture": "x86_64", + "boottime": "2018-10-01T13:33:02Z", + "containerized": false, + "id": "87778e62461b4d609aee5a20f2ec4be6", + "name": "ubuntu-bionic", + "network": { + "interfaces": [ + { + "flags": "up|broadcast|multicast", + "index": 2, + "ip": [ + "10.0.2.15", + "fe80::2d:fdff:fe81:e747" + ], + "mac": "02:2d:fd:81:e7:47", + "mtu": 1500, + "name": "enp0s3" + } + ] + }, + "os": { + "family": "debian", + "kernel": "4.15.0-34-generic", + "name": "Ubuntu", + "platform": "ubuntu", + "version": "18.04.1 LTS (Bionic Beaver)" + }, + "timezone.name": "UTC", + "timezone.offset.sec": 0, + "uptime": 222611977726 + } + } +} diff --git a/x-pack/auditbeat/module/system/host/_meta/fields.yml b/x-pack/auditbeat/module/system/host/_meta/fields.yml index 9729e10e6df6..2e9f2f479722 100644 --- a/x-pack/auditbeat/module/system/host/_meta/fields.yml +++ b/x-pack/auditbeat/module/system/host/_meta/fields.yml @@ -91,7 +91,7 @@ type: text description: > Interface flags. - - name: ips + - name: ip type: ip description: > IP addresses. diff --git a/x-pack/auditbeat/module/system/host/host.go b/x-pack/auditbeat/module/system/host/host.go index dc7c6d270386..7801d7a5bb91 100644 --- a/x-pack/auditbeat/module/system/host/host.go +++ b/x-pack/auditbeat/module/system/host/host.go @@ -109,7 +109,7 @@ func (ifc NetworkInterface) toMapStr() common.MapStr { "name": ifc.Name, "mac": ifc.HardwareAddr.String(), "flags": ifc.Flags.String(), - "ips": ifc.ips, + "ip": ifc.ips, } } diff --git a/x-pack/auditbeat/module/system/host/host_test.go b/x-pack/auditbeat/module/system/host/host_test.go new file mode 100644 index 000000000000..bd356379c992 --- /dev/null +++ b/x-pack/auditbeat/module/system/host/host_test.go @@ -0,0 +1,26 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package host + +import ( + "testing" + + mbtest "github.com/elastic/beats/metricbeat/mb/testing" +) + +func TestData(t *testing.T) { + f := mbtest.NewReportingMetricSetV2(t, getConfig()) + err := mbtest.WriteEventsReporterV2(f, t, "") + if err != nil { + t.Fatal("write", err) + } +} + +func getConfig() map[string]interface{} { + return map[string]interface{}{ + "module": "system", + "metricsets": []string{"host"}, + } +} diff --git a/x-pack/auditbeat/module/system/packages/_meta/data.json b/x-pack/auditbeat/module/system/packages/_meta/data.json new file mode 100644 index 000000000000..36bbb2fd0bac --- /dev/null +++ b/x-pack/auditbeat/module/system/packages/_meta/data.json @@ -0,0 +1,30 @@ +{ + "@timestamp": "2017-10-12T08:05:34.853Z", + "beat": { + "hostname": "host.example.com", + "name": "host.example.com" + }, + "metricset": { + "module": "system", + "name": "packages", + "rtt": 115 + }, + "system": { + "packages": { + "package": [ + { + "arch": "amd64", + "installtime": "0001-01-01T00:00:00Z", + "license": "", + "name": "vim-tiny", + "release": "", + "size": 1271, + "status": "installed", + "summary": "Vi IMproved - enhanced vi editor - compact version", + "url": "", + "version": "2:8.0.1453-1ubuntu1" + } + ] + } + } +} diff --git a/x-pack/auditbeat/module/system/packages/_meta/fields.yml b/x-pack/auditbeat/module/system/packages/_meta/fields.yml index 383dd84becf6..f57093bc150d 100644 --- a/x-pack/auditbeat/module/system/packages/_meta/fields.yml +++ b/x-pack/auditbeat/module/system/packages/_meta/fields.yml @@ -4,15 +4,15 @@ `packages` contains information about installed packages. release: experimental fields: - - name: status - type: keyword - description: > - Package change - `installed` or `removed`. - - name: packages + - name: package type: array description: > - List of packages. + One or more packages. fields: + - name: status + type: keyword + description: > + Package change - `new`, `installed` or `removed`. - name: package.name type: keyword description: > diff --git a/x-pack/auditbeat/module/system/packages/packages.go b/x-pack/auditbeat/module/system/packages/packages.go index f0ce01e0d087..2f80fa132f8d 100644 --- a/x-pack/auditbeat/module/system/packages/packages.go +++ b/x-pack/auditbeat/module/system/packages/packages.go @@ -148,23 +148,23 @@ func (ms *MetricSet) Fetch(report mb.ReporterV2) { installed, removed := ms.cache.DiffAndUpdateCache(convertToCacheable(packages)) for _, pkgInfo := range installed { + pkgInfoMapStr := pkgInfo.(*Package).toMapStr() + pkgInfoMapStr.Put("status", "new") + report.Event(mb.Event{ MetricSetFields: common.MapStr{ - "status": "installed", - "packages": common.MapStr{ - "package": pkgInfo.(Package).toMapStr(), - }, + "package": pkgInfoMapStr, }, }) } for _, pkgInfo := range removed { + pkgInfoMapStr := pkgInfo.(*Package).toMapStr() + pkgInfoMapStr.Put("status", "removed") + report.Event(mb.Event{ MetricSetFields: common.MapStr{ - "status": "removed", - "packages": common.MapStr{ - "package": pkgInfo.(Package).toMapStr(), - }, + "package": pkgInfoMapStr, }, }) } @@ -173,14 +173,15 @@ func (ms *MetricSet) Fetch(report mb.ReporterV2) { var pkgInfos []common.MapStr for _, pkgInfo := range packages { - pkgInfos = append(pkgInfos, common.MapStr{ - "package": pkgInfo.toMapStr(), - }) + pkgInfoMapStr := pkgInfo.toMapStr() + pkgInfoMapStr.Put("status", "installed") + + pkgInfos = append(pkgInfos, pkgInfoMapStr) } report.Event(mb.Event{ MetricSetFields: common.MapStr{ - "packages": pkgInfos, + "package": pkgInfos, }, }) diff --git a/x-pack/auditbeat/module/system/packages/packages_test.go b/x-pack/auditbeat/module/system/packages/packages_test.go new file mode 100644 index 000000000000..f8078d078cc1 --- /dev/null +++ b/x-pack/auditbeat/module/system/packages/packages_test.go @@ -0,0 +1,26 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package packages + +import ( + "testing" + + mbtest "github.com/elastic/beats/metricbeat/mb/testing" +) + +func TestData(t *testing.T) { + f := mbtest.NewReportingMetricSetV2(t, getConfig()) + err := mbtest.WriteEventsReporterV2(f, t, "") + if err != nil { + t.Fatal("write", err) + } +} + +func getConfig() map[string]interface{} { + return map[string]interface{}{ + "module": "system", + "metricsets": []string{"packages"}, + } +} diff --git a/x-pack/auditbeat/module/system/processes/_meta/data.json b/x-pack/auditbeat/module/system/processes/_meta/data.json new file mode 100644 index 000000000000..183bc0ada59b --- /dev/null +++ b/x-pack/auditbeat/module/system/processes/_meta/data.json @@ -0,0 +1,30 @@ +{ + "@timestamp": "2017-10-12T08:05:34.853Z", + "beat": { + "hostname": "host.example.com", + "name": "host.example.com" + }, + "metricset": { + "module": "system", + "name": "processes", + "rtt": 115 + }, + "system": { + "processes": { + "process": [ + { + "args": [ + "/sbin/init" + ], + "cwd": "/", + "exe": "/lib/systemd/systemd", + "name": "systemd", + "pid": 1, + "ppid": 0, + "starttime": "2018-10-01T14:21:49.06Z", + "status": "running" + } + ] + } + } +} diff --git a/x-pack/auditbeat/module/system/processes/_meta/fields.yml b/x-pack/auditbeat/module/system/processes/_meta/fields.yml index c65f644ba97e..2778e2722051 100644 --- a/x-pack/auditbeat/module/system/processes/_meta/fields.yml +++ b/x-pack/auditbeat/module/system/processes/_meta/fields.yml @@ -4,15 +4,15 @@ `processes` contains process information. release: experimental fields: - - name: status - type: keyword - description: > - Process change - `started` or `stopped`. - - name: processes + - name: process type: array description: > - List of processes. + One or more processes. fields: + - name: status + type: keyword + description: > + Process status - `started`, `running`, or `stopped`. - name: process.name type: keyword description: > @@ -40,4 +40,4 @@ - name: process.starttime type: date description: > - Start time of the process. \ No newline at end of file + Start time of the process. diff --git a/x-pack/auditbeat/module/system/processes/processes.go b/x-pack/auditbeat/module/system/processes/processes.go index e440c04141ff..76bf7bc24698 100644 --- a/x-pack/auditbeat/module/system/processes/processes.go +++ b/x-pack/auditbeat/module/system/processes/processes.go @@ -56,14 +56,13 @@ func (pInfo ProcessInfo) Hash() uint64 { func (pInfo ProcessInfo) toMapStr() common.MapStr { return common.MapStr{ // https://github.com/elastic/ecs#-process-fields - "process.name": pInfo.Name, - "process.args": pInfo.Args, - "process.pid": pInfo.PID, - "process.ppid": pInfo.PPID, - - "process.cwd": pInfo.CWD, - "process.exe": pInfo.Exe, - "process.starttime": pInfo.StartTime, + "name": pInfo.Name, + "args": pInfo.Args, + "pid": pInfo.PID, + "ppid": pInfo.PPID, + "cwd": pInfo.CWD, + "exe": pInfo.Exe, + "starttime": pInfo.StartTime, } } @@ -107,19 +106,23 @@ func (ms *MetricSet) Fetch(report mb.ReporterV2) { started, stopped := ms.cache.DiffAndUpdateCache(convertToCacheable(processInfos)) for _, pInfo := range started { + pInfoMapStr := pInfo.(*ProcessInfo).toMapStr() + pInfoMapStr.Put("status", "started") + report.Event(mb.Event{ MetricSetFields: common.MapStr{ - "status": "started", - "processes": pInfo.(ProcessInfo).toMapStr(), + "process": pInfoMapStr, }, }) } for _, pInfo := range stopped { + pInfoMapStr := pInfo.(*ProcessInfo).toMapStr() + pInfoMapStr.Put("status", "stopped") + report.Event(mb.Event{ MetricSetFields: common.MapStr{ - "status": "stopped", - "processes": pInfo.(ProcessInfo).toMapStr(), + "process": pInfoMapStr, }, }) } @@ -128,12 +131,15 @@ func (ms *MetricSet) Fetch(report mb.ReporterV2) { var processEvents []common.MapStr for _, pInfo := range processInfos { - processEvents = append(processEvents, pInfo.toMapStr()) + pInfoMapStr := pInfo.toMapStr() + pInfoMapStr.Put("status", "running") + + processEvents = append(processEvents, pInfoMapStr) } report.Event(mb.Event{ MetricSetFields: common.MapStr{ - "processes": processEvents, + "process": processEvents, }, }) diff --git a/x-pack/auditbeat/module/system/processes/processes_test.go b/x-pack/auditbeat/module/system/processes/processes_test.go new file mode 100644 index 000000000000..c14c0afb9958 --- /dev/null +++ b/x-pack/auditbeat/module/system/processes/processes_test.go @@ -0,0 +1,26 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package processes + +import ( + "testing" + + mbtest "github.com/elastic/beats/metricbeat/mb/testing" +) + +func TestData(t *testing.T) { + f := mbtest.NewReportingMetricSetV2(t, getConfig()) + err := mbtest.WriteEventsReporterV2(f, t, "") + if err != nil { + t.Fatal("write", err) + } +} + +func getConfig() map[string]interface{} { + return map[string]interface{}{ + "module": "system", + "metricsets": []string{"processes"}, + } +} diff --git a/x-pack/auditbeat/tests/system/auditbeat_xpack.py b/x-pack/auditbeat/tests/system/auditbeat_xpack.py index eeb6a39b7d50..ede9115b67ac 100644 --- a/x-pack/auditbeat/tests/system/auditbeat_xpack.py +++ b/x-pack/auditbeat/tests/system/auditbeat_xpack.py @@ -47,6 +47,9 @@ def check_metricset(self, module, metricset, fields=[], warnings_allowed=False): evt = output[0] print(evt) - self.assertItemsEqual(self.de_dot(fields), evt.keys()) + flattened = self.flatten_object(evt, {}) + for f in fields: + if not f in flattened: + raise Exception("Field '{}' not found in event.".format(f)) self.assert_fields_are_documented(evt) diff --git a/x-pack/auditbeat/tests/system/test_metricsets.py b/x-pack/auditbeat/tests/system/test_metricsets.py index e217cdbba362..c7bd108ca80e 100644 --- a/x-pack/auditbeat/tests/system/test_metricsets.py +++ b/x-pack/auditbeat/tests/system/test_metricsets.py @@ -18,7 +18,7 @@ def test_metricset_host(self): host metricset collects general information about a server. """ - fields = ["system.host.uptime", "system.host.ip", "system.host.os.name"] + fields = ["system.host.uptime", "system.host.network.interfaces", "system.host.os.name"] # Metricset is experimental and that generates a warning, TODO: remove later self.check_metricset("system", "host", COMMON_FIELDS + fields, warnings_allowed=True) @@ -28,7 +28,7 @@ def test_metricset_packages(self): packages metricset collects information about installed packages on a system. """ - fields = ["system.packages.packages.package.name"] + fields = ["system.packages.package"] # Metricset is experimental and that generates a warning, TODO: remove later self.check_metricset("system", "packages", COMMON_FIELDS + fields, warnings_allowed=True) @@ -39,7 +39,7 @@ def test_metricset_processes(self): processes metricset collects information about processes running on a system. """ - fields = ["system.processes.processes.process.name"] + fields = ["system.processes.process"] # Metricset is experimental and that generates a warning, TODO: remove later self.check_metricset("system", "processes", COMMON_FIELDS + fields, warnings_allowed=True) From d6548ef8d6e7563d25762f2c9a9d1b5a651abadc Mon Sep 17 00:00:00 2001 From: Christoph Wurm Date: Tue, 2 Oct 2018 11:00:56 +0100 Subject: [PATCH 19/21] Add comment to NetworkInterface. --- x-pack/auditbeat/module/system/host/host.go | 1 + 1 file changed, 1 insertion(+) diff --git a/x-pack/auditbeat/module/system/host/host.go b/x-pack/auditbeat/module/system/host/host.go index 7801d7a5bb91..94714d737ca0 100644 --- a/x-pack/auditbeat/module/system/host/host.go +++ b/x-pack/auditbeat/module/system/host/host.go @@ -96,6 +96,7 @@ func (ms *MetricSet) Fetch(report mb.ReporterV2) { }) } +// NetworkInterface represent information on a network interface. type NetworkInterface struct { net.Interface From 637d3df38937cec9a00bf34a5fe1e4566bd8c84c Mon Sep 17 00:00:00 2001 From: Christoph Wurm Date: Wed, 3 Oct 2018 13:24:05 +0100 Subject: [PATCH 20/21] Cache Hash() in Cache. --- x-pack/auditbeat/cache/cache.go | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/x-pack/auditbeat/cache/cache.go b/x-pack/auditbeat/cache/cache.go index ad3bbc8926ef..6f213559b184 100644 --- a/x-pack/auditbeat/cache/cache.go +++ b/x-pack/auditbeat/cache/cache.go @@ -30,27 +30,28 @@ func (cache *Cache) IsEmpty() bool { // cache contents, and returns both items new to the cache and items that are in the cache // but missing in the new data. func (cache *Cache) DiffAndUpdateCache(current []Cacheable) (new, missing []interface{}) { + // Create hashmap of incoming Cacheables to avoid calling Hash() on each one many times + currentMap := make(map[uint64]Cacheable, len(current)) + + for _, currentValue := range current { + currentMap[currentValue.Hash()] = currentValue + } + // Check for and delete missing - what is no longer in current that was in the cache - for cacheKey, cacheValue := range cache.hashMap { - found := false - for _, currentValue := range current { - if currentValue.Hash() == cacheKey { - found = true - break - } - } + for cacheHash, cacheValue := range cache.hashMap { + _, found := currentMap[cacheHash] if !found { missing = append(missing, cacheValue) - delete(cache.hashMap, cacheKey) + delete(cache.hashMap, cacheHash) } } // Check for new - what is in current but not in cache - for _, currentValue := range current { - if _, contains := cache.hashMap[currentValue.Hash()]; !contains { + for currentHash, currentValue := range currentMap { + if _, contains := cache.hashMap[currentHash]; !contains { new = append(new, currentValue) - cache.hashMap[currentValue.Hash()] = currentValue + cache.hashMap[currentHash] = currentValue } } From 30ef25eb88b4cfcb1fd6e00ce597d65b16c4cd45 Mon Sep 17 00:00:00 2001 From: Christoph Wurm Date: Thu, 4 Oct 2018 12:58:17 +0100 Subject: [PATCH 21/21] Remove RPM code (for now). --- .../module/system/packages/packages.go | 55 +------------------ 1 file changed, 2 insertions(+), 53 deletions(-) diff --git a/x-pack/auditbeat/module/system/packages/packages.go b/x-pack/auditbeat/module/system/packages/packages.go index 2f80fa132f8d..d73ca70ba97f 100644 --- a/x-pack/auditbeat/module/system/packages/packages.go +++ b/x-pack/auditbeat/module/system/packages/packages.go @@ -9,7 +9,6 @@ import ( "fmt" "io/ioutil" "os" - "os/exec" "path" "strconv" "strings" @@ -205,10 +204,8 @@ func convertToCacheable(packages []*Package) []cache.Cacheable { func getPackages(osFamily string) (packages []*Package, err error) { switch osFamily { case redhat: - packages, err = listRPMPackages() - if err != nil { - err = errors.Wrap(err, "error getting RPM packages") - } + // TODO: Implement RPM + err = errors.New("RPM not yet supported") case debian: packages, err = listDebPackages() if err != nil { @@ -226,54 +223,6 @@ func getPackages(osFamily string) (packages []*Package, err error) { return } -/* -The following functions copied from https://github.com/tsg/listpackages/blob/master/main.go -*/ -func listRPMPackages() ([]*Package, error) { - const format = "%{NAME}|%{VERSION}|%{RELEASE}|%{ARCH}|%{LICENSE}|%{INSTALLTIME}|%{SIZE}|%{URL}|%{SUMMARY}\\n" - out, err := exec.Command("/usr/bin/rpm", "--qf", format, "-qa").Output() - if err != nil { - return nil, errors.Wrapf(err, "error running rpm -qa command - output: %v", out) - } - - lines := strings.Split(string(out), "\n") - var packages []*Package - for _, line := range lines { - if len(strings.TrimSpace(line)) == 0 { - continue - } - words := strings.SplitN(line, "|", 9) - if len(words) < 9 { - return nil, fmt.Errorf("line '%s' doesn't have enough elements", line) - } - pkg := &Package{ - Name: words[0], - Version: words[1], - Release: words[2], - Arch: words[3], - License: words[4], - // install time - 5 - // size - 6 - URL: words[7], - Summary: words[8], - } - ts, err := strconv.ParseInt(words[5], 10, 64) - if err != nil { - return nil, errors.Wrapf(err, "error converting %s to string", words[5]) - } - pkg.InstallTime = time.Unix(ts, 0) - - pkg.Size, err = strconv.ParseUint(words[6], 10, 64) - if err != nil { - return nil, errors.Wrapf(err, "error converting %s to string", words[6]) - } - - packages = append(packages, pkg) - } - - return packages, nil -} - func listDebPackages() ([]*Package, error) { const statusFile = "/var/lib/dpkg/status" file, err := os.Open(statusFile)