diff --git a/metricbeat/module/system/socket/listeners.go b/metricbeat/helper/socket/listeners.go similarity index 100% rename from metricbeat/module/system/socket/listeners.go rename to metricbeat/helper/socket/listeners.go diff --git a/metricbeat/module/system/socket/listeners_test.go b/metricbeat/helper/socket/listeners_test.go similarity index 100% rename from metricbeat/module/system/socket/listeners_test.go rename to metricbeat/helper/socket/listeners_test.go diff --git a/metricbeat/helper/socket/netlink.go b/metricbeat/helper/socket/netlink.go new file mode 100644 index 000000000000..130c147a49c4 --- /dev/null +++ b/metricbeat/helper/socket/netlink.go @@ -0,0 +1,54 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// +build linux + +package socket + +import ( + "os" + "sync/atomic" + + "github.com/pkg/errors" + + "github.com/elastic/gosigar/sys/linux" +) + +// NetlinkSession communicates with the kernel's netlink subsystem. +type NetlinkSession struct { + readBuffer []byte + seq uint32 +} + +// NewNetlinkSession creates a new netlink session. +func NewNetlinkSession() *NetlinkSession { + return &NetlinkSession{ + readBuffer: make([]byte, os.Getpagesize()), + } +} + +// GetSocketList retrieves the current list of sockets from the kernel. +func (session *NetlinkSession) GetSocketList() ([]*linux.InetDiagMsg, error) { + // Send request over netlink and parse responses. + req := linux.NewInetDiagReq() + req.Header.Seq = atomic.AddUint32(&session.seq, 1) + sockets, err := linux.NetlinkInetDiagWithBuf(req, session.readBuffer, nil) + if err != nil { + return nil, errors.Wrap(err, "failed requesting socket dump") + } + return sockets, nil +} diff --git a/metricbeat/module/system/socket/ptable.go b/metricbeat/helper/socket/ptable.go similarity index 100% rename from metricbeat/module/system/socket/ptable.go rename to metricbeat/helper/socket/ptable.go diff --git a/metricbeat/module/system/socket/socket.go b/metricbeat/module/system/socket/socket.go index 7c45df08b7a1..98bfe431e132 100644 --- a/metricbeat/module/system/socket/socket.go +++ b/metricbeat/module/system/socket/socket.go @@ -24,17 +24,17 @@ import ( "net" "os" "path/filepath" - "sync/atomic" "syscall" + "github.com/pkg/errors" + "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" + sock "github.com/elastic/beats/metricbeat/helper/socket" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/mb/parse" "github.com/elastic/beats/metricbeat/module/system" "github.com/elastic/gosigar/sys/linux" - - "github.com/pkg/errors" ) var ( @@ -50,14 +50,13 @@ func init() { type MetricSet struct { mb.BaseMetricSet - readBuffer []byte - seq uint32 - ptable *ProcTable + netlink *sock.NetlinkSession + ptable *sock.ProcTable euid int previousConns hashSet currentConns hashSet reverseLookup *ReverseLookupCache - listeners *ListenerTable + listeners *sock.ListenerTable users UserCache } @@ -72,7 +71,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { return nil, errors.New("unexpected module type") } - ptable, err := NewProcTable(filepath.Join(systemModule.HostFS, "/proc")) + ptable, err := sock.NewProcTable(filepath.Join(systemModule.HostFS, "/proc")) if err != nil { return nil, err } @@ -83,12 +82,12 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { m := &MetricSet{ BaseMetricSet: base, - readBuffer: make([]byte, os.Getpagesize()), + netlink: sock.NewNetlinkSession(), ptable: ptable, euid: os.Geteuid(), previousConns: hashSet{}, currentConns: hashSet{}, - listeners: NewListenerTable(), + listeners: sock.NewListenerTable(), users: NewUserCache(), } @@ -114,10 +113,7 @@ func (m *MetricSet) Fetch() ([]common.MapStr, error) { debugf("process table refresh had failures: %v", err) } - // Send request over netlink and parse responses. - req := linux.NewInetDiagReq() - req.Header.Seq = atomic.AddUint32(&m.seq, 1) - sockets, err := linux.NetlinkInetDiagWithBuf(req, m.readBuffer, nil) + sockets, err := m.netlink.GetSocketList() if err != nil { return nil, errors.Wrap(err, "failed requesting socket dump") } @@ -192,7 +188,7 @@ func (m *MetricSet) enrichConnectionData(c *connection) { c.LocalIP, c.LocalPort, c.RemoteIP, c.RemotePort) // Reverse DNS lookup on the remote IP. - if m.reverseLookup != nil && c.Direction != Listening { + if m.reverseLookup != nil && c.Direction != sock.Listening { hostname, err := m.reverseLookup.Lookup(c.RemoteIP) if err != nil { c.DestHostError = err @@ -227,7 +223,7 @@ type connection struct { RemotePort int State linux.TCPState - Direction Direction + Direction sock.Direction DestHost string // Reverse lookup of dest IP. DestHostETLDPlusOne string diff --git a/x-pack/auditbeat/include/list.go b/x-pack/auditbeat/include/list.go index 4b69f25746a8..af846f4d8009 100644 --- a/x-pack/auditbeat/include/list.go +++ b/x-pack/auditbeat/include/list.go @@ -10,5 +10,6 @@ import ( _ "github.com/elastic/beats/x-pack/auditbeat/module/system/host" _ "github.com/elastic/beats/x-pack/auditbeat/module/system/packages" _ "github.com/elastic/beats/x-pack/auditbeat/module/system/processes" + _ "github.com/elastic/beats/x-pack/auditbeat/module/system/socket" _ "github.com/elastic/beats/x-pack/auditbeat/module/system/user" ) diff --git a/x-pack/auditbeat/module/system/_meta/config.yml.tmpl b/x-pack/auditbeat/module/system/_meta/config.yml.tmpl index 33cca5a4ebdb..0b8bca434f77 100644 --- a/x-pack/auditbeat/module/system/_meta/config.yml.tmpl +++ b/x-pack/auditbeat/module/system/_meta/config.yml.tmpl @@ -1,5 +1,6 @@ {{ if .Reference -}} {{ end -}} +{{ if ne .GOOS "windows" -}} - module: system metricsets: @@ -7,16 +8,13 @@ - packages - processes {{ if eq .GOOS "linux" -}} + - socket - user {{- end }} state.period: 12h report_changes: true - - {{ if eq .GOOS "darwin" -}} - {{ else if eq .GOOS "windows" -}} - {{ else -}} - {{- end }} +{{- end }} {{ if .Reference }} {{- end }} diff --git a/x-pack/auditbeat/module/system/socket/_meta/data.json b/x-pack/auditbeat/module/system/socket/_meta/data.json new file mode 100644 index 000000000000..34e4aeaf777a --- /dev/null +++ b/x-pack/auditbeat/module/system/socket/_meta/data.json @@ -0,0 +1,33 @@ +{ + "@timestamp": "2017-10-12T08:05:34.853Z", + "agent": { + "hostname": "host.example.com", + "name": "host.example.com" + }, + "user": { + "name": "vagrant", + "id": 1000 + }, + "process": { + "pid": 4021, + "name": "lynx" + }, + "source": { + "ip": "10.0.2.15", + "port": 33956 + }, + "destination": { + "port": 443, + "ip": "52.10.168.186" + }, + "event": { + "type": "event", + "action": "socket_opened", + "module": "system", + "dataset": "socket" + }, + "network": { + "type": "ipv4", + "direction": "outbound" + } +} diff --git a/x-pack/auditbeat/module/system/socket/_meta/docs.asciidoc b/x-pack/auditbeat/module/system/socket/_meta/docs.asciidoc new file mode 100644 index 000000000000..fc902d7cf674 --- /dev/null +++ b/x-pack/auditbeat/module/system/socket/_meta/docs.asciidoc @@ -0,0 +1,8 @@ +The System `socket` metricset provides ... TODO. + +The module is implemented for Linux only. + +[float] +=== Configuration options + +TODO diff --git a/x-pack/auditbeat/module/system/socket/config.go b/x-pack/auditbeat/module/system/socket/config.go new file mode 100644 index 000000000000..cdd170ff8b8c --- /dev/null +++ b/x-pack/auditbeat/module/system/socket/config.go @@ -0,0 +1,31 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package socket + +import ( + "time" +) + +// Config defines the socket metricset's configuration options. +type Config struct { + StatePeriod time.Duration `config:"state.period"` + SocketStatePeriod time.Duration `config:"socket.state.period"` +} + +// Validate validates the host metricset config. +func (c *Config) Validate() error { + return nil +} + +func (c *Config) effectiveStatePeriod() time.Duration { + if c.SocketStatePeriod != 0 { + return c.SocketStatePeriod + } + return c.StatePeriod +} + +var defaultConfig = Config{ + StatePeriod: 1 * time.Hour, +} diff --git a/x-pack/auditbeat/module/system/socket/socket.go b/x-pack/auditbeat/module/system/socket/socket.go new file mode 100644 index 000000000000..90a7a004c33b --- /dev/null +++ b/x-pack/auditbeat/module/system/socket/socket.go @@ -0,0 +1,352 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +// +build linux + +package socket + +import ( + "fmt" + "net" + "os/user" + "strconv" + "syscall" + "time" + + "github.com/OneOfOne/xxhash" + "github.com/gofrs/uuid" + "github.com/pkg/errors" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/cfgwarn" + "github.com/elastic/beats/libbeat/logp" + sock "github.com/elastic/beats/metricbeat/helper/socket" + "github.com/elastic/beats/metricbeat/mb" + "github.com/elastic/beats/x-pack/auditbeat/cache" + "github.com/elastic/gosigar/sys/linux" +) + +const ( + moduleName = "system" + metricsetName = "socket" + + eventTypeState = "state" + eventTypeEvent = "event" + + eventActionExistingSocket = "existing_socket" + eventActionSocketOpened = "socket_opened" + eventActionSocketClosed = "socket_closed" +) + +func init() { + mb.Registry.MustAddMetricSet(moduleName, metricsetName, New, + mb.DefaultMetricSet(), + ) +} + +// MetricSet collects data about sockets. +type MetricSet struct { + mb.BaseMetricSet + config Config + cache *cache.Cache + log *logp.Logger + + netlink *sock.NetlinkSession + // TODO: Replace with process data collected in processes metricset + ptable *sock.ProcTable + listeners *sock.ListenerTable + + lastState time.Time +} + +// Socket represents information about a socket. +type Socket struct { + Family linux.AddressFamily + LocalIP net.IP + LocalPort int + RemoteIP net.IP + RemotePort int + Inode uint32 + Direction sock.Direction + UID uint32 + Username string + ProcessPID int + ProcessName string + Error error +} + +// newSocket creates a new socket out of a netlink diag message. +func newSocket(diag *linux.InetDiagMsg) *Socket { + return &Socket{ + Family: linux.AddressFamily(diag.Family), + LocalIP: diag.SrcIP(), + LocalPort: diag.SrcPort(), + RemoteIP: diag.DstIP(), + RemotePort: diag.DstPort(), + Inode: diag.Inode, + UID: diag.UID, + ProcessPID: -1, + } +} + +// Hash creates a hash for Socket. +func (s Socket) Hash() uint64 { + h := xxhash.New64() + h.WriteString(s.LocalIP.String()) + h.WriteString(s.RemoteIP.String()) + h.WriteString(strconv.Itoa(s.LocalPort)) + h.WriteString(strconv.Itoa(s.RemotePort)) + h.WriteString(strconv.FormatUint(uint64(s.Inode), 10)) + return h.Sum64() +} + +func (s Socket) toMapStr() common.MapStr { + mapstr := common.MapStr{ + "network": common.MapStr{ + "type": s.Family.String(), + }, + "user": common.MapStr{ + "id": s.UID, + }, + } + + if s.Username != "" { + mapstr.Put("user.name", s.Username) + } + + if s.ProcessName != "" { + mapstr.Put("process", common.MapStr{ + "pid": s.ProcessPID, + "name": s.ProcessName, + }) + } + + switch s.Direction { + case sock.Outgoing: + mapstr.Put("network.direction", "outbound") + mapstr.Put("source", common.MapStr{ + "ip": s.LocalIP, + "port": s.LocalPort, + }) + mapstr.Put("destination", common.MapStr{ + "ip": s.RemoteIP, + "port": s.RemotePort, + }) + case sock.Incoming: + mapstr.Put("network.direction", "inbound") + mapstr.Put("source", common.MapStr{ + "ip": s.RemoteIP, + "port": s.RemotePort, + }) + mapstr.Put("destination", common.MapStr{ + "ip": s.LocalIP, + "port": s.LocalPort, + }) + case sock.Listening: + mapstr.Put("network.direction", "listening") + mapstr.Put("destination", common.MapStr{ + "ip": s.LocalIP, + "port": s.LocalPort, + }) + } + + if s.Error != nil { + mapstr.Put("error.message", s.Error.Error()) + } + + return mapstr +} + +// New constructs a new MetricSet. +func New(base mb.BaseMetricSet) (mb.MetricSet, error) { + cfgwarn.Experimental("The %v/%v dataset is experimental", moduleName, metricsetName) + + config := defaultConfig + if err := base.Module().UnpackConfig(&config); err != nil { + return nil, errors.Wrapf(err, "failed to unpack the %v/%v config", moduleName, metricsetName) + } + + ptable, err := sock.NewProcTable("") + if err != nil { + return nil, errors.Wrap(err, "failed to create process table") + } + + ms := &MetricSet{ + BaseMetricSet: base, + config: config, + log: logp.NewLogger(metricsetName), + cache: cache.New(), + netlink: sock.NewNetlinkSession(), + ptable: ptable, + listeners: sock.NewListenerTable(), + } + + return ms, nil +} + +// Fetch collects the user information. It is invoked periodically. +func (ms *MetricSet) Fetch(report mb.ReporterV2) { + needsStateUpdate := time.Since(ms.lastState) > ms.config.effectiveStatePeriod() + if needsStateUpdate || ms.cache.IsEmpty() { + ms.log.Debugf("State update needed (needsStateUpdate=%v, cache.IsEmpty()=%v)", needsStateUpdate, ms.cache.IsEmpty()) + err := ms.reportState(report) + if err != nil { + ms.log.Error(err) + report.Error(err) + } + ms.log.Debugf("Next state update by %v", ms.lastState.Add(ms.config.effectiveStatePeriod())) + } + + err := ms.reportChanges(report) + if err != nil { + ms.log.Error(err) + report.Error(err) + } +} + +// reportState reports all existing sockets on the system. +func (ms *MetricSet) reportState(report mb.ReporterV2) error { + ms.lastState = time.Now() + + sockets, err := ms.getSockets() + if err != nil { + return errors.Wrap(err, "failed to get sockets") + } + ms.log.Debugf("Found %d sockets", len(sockets)) + + stateID, err := uuid.NewV4() + if err != nil { + return errors.Wrap(err, "error generating state ID") + } + + // Refresh data for direction and process enrichment + ms.refreshEnrichments(sockets) + + for _, socket := range sockets { + err = ms.enrichSocket(socket) + if err != nil { + return err + } + + event := socketEvent(socket, eventTypeState, eventActionExistingSocket) + event.RootFields.Put("event.id", stateID.String()) + report.Event(event) + } + + // This will initialize the cache with the current sockets + ms.cache.DiffAndUpdateCache(convertToCacheable(sockets)) + + return nil +} + +// reportChanges detects and reports any changes to sockets on this system since the last call. +func (ms *MetricSet) reportChanges(report mb.ReporterV2) error { + sockets, err := ms.getSockets() + if err != nil { + return errors.Wrap(err, "failed to get sockets") + } + + opened, closed := ms.cache.DiffAndUpdateCache(convertToCacheable(sockets)) + ms.log.Debugf("Found %d sockets (%d opened, %d closed)", len(sockets), len(opened), len(closed)) + + if len(opened) > 0 { + // Refresh data for direction and process enrichment - only new sockets + // need enrichment + ms.refreshEnrichments(sockets) + + for _, s := range opened { + err = ms.enrichSocket(s.(*Socket)) + if err != nil { + return err + } + + report.Event(socketEvent(s.(*Socket), eventTypeEvent, eventActionSocketOpened)) + } + } + + for _, s := range closed { + report.Event(socketEvent(s.(*Socket), eventTypeEvent, eventActionSocketClosed)) + } + + return nil +} + +func socketEvent(socket *Socket, eventType string, eventAction string) mb.Event { + event := mb.Event{ + RootFields: socket.toMapStr(), + } + + event.RootFields.Put("event.type", eventType) + event.RootFields.Put("event.action", eventAction) + + return event +} + +func convertToCacheable(sockets []*Socket) []cache.Cacheable { + c := make([]cache.Cacheable, 0, len(sockets)) + + for _, s := range sockets { + c = append(c, s) + } + + return c +} + +func (ms *MetricSet) enrichSocket(socket *Socket) error { + userAccount, err := user.LookupId(strconv.FormatUint(uint64(socket.UID), 10)) + if err != nil { + return errors.Wrapf(err, "error looking up socket UID") + } + + socket.Username = userAccount.Username + + socket.Direction = ms.listeners.Direction(uint8(syscall.IPPROTO_TCP), + socket.LocalIP, socket.LocalPort, socket.RemoteIP, socket.RemotePort) + + if ms.ptable != nil { + proc := ms.ptable.ProcessBySocketInode(socket.Inode) + if proc != nil { + // Add process info by finding the process that holds the socket's inode. + socket.ProcessPID = proc.PID + socket.ProcessName = proc.Command + } else if socket.Inode == 0 { + socket.Error = fmt.Errorf("process has exited (inode=%v)", socket.Inode) + } else { + socket.Error = fmt.Errorf("process not found (inode=%v)", socket.Inode) + } + } + + return nil +} + +func (ms *MetricSet) getSockets() ([]*Socket, error) { + diags, err := ms.netlink.GetSocketList() + if err != nil { + return nil, errors.Wrap(err, "error getting sockets") + } + + sockets := make([]*Socket, 0, len(diags)) + for _, diag := range diags { + sockets = append(sockets, newSocket(diag)) + } + + return sockets, nil +} + +func (ms *MetricSet) refreshEnrichments(sockets []*Socket) { + // Refresh inode to process mapping for process enrichment + err := ms.ptable.Refresh() + if err != nil { + // Errors here can happen, e.g. if a process exits while its /proc is being read. + ms.log.Warn(errors.Wrap(err, "error refreshing process data")) + } + + // Register all listening sockets + ms.listeners.Reset() + for _, socket := range sockets { + if socket.RemotePort == 0 { + ms.listeners.Put(uint8(syscall.IPPROTO_TCP), socket.LocalIP, socket.LocalPort) + } + } +} diff --git a/x-pack/auditbeat/module/system/socket/socket_other.go b/x-pack/auditbeat/module/system/socket/socket_other.go new file mode 100644 index 000000000000..2c8c92882e39 --- /dev/null +++ b/x-pack/auditbeat/module/system/socket/socket_other.go @@ -0,0 +1,29 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +// +build !linux + +package socket + +import ( + "fmt" + + "github.com/elastic/beats/metricbeat/mb" +) + +const ( + moduleName = "system" + metricsetName = "socket" +) + +func init() { + mb.Registry.MustAddMetricSet(moduleName, metricsetName, New, + mb.DefaultMetricSet(), + ) +} + +// New returns an error. +func New(base mb.BaseMetricSet) (mb.MetricSet, error) { + return nil, fmt.Errorf("the %v/%v dataset is only supported on Linux", moduleName, metricsetName) +} diff --git a/x-pack/auditbeat/module/system/socket/socket_test.go b/x-pack/auditbeat/module/system/socket/socket_test.go new file mode 100644 index 000000000000..e7824cab83b0 --- /dev/null +++ b/x-pack/auditbeat/module/system/socket/socket_test.go @@ -0,0 +1,122 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +// +build linux + +package socket + +import ( + "net" + "os" + "strconv" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/auditbeat/core" + "github.com/elastic/beats/metricbeat/mb" + mbtest "github.com/elastic/beats/metricbeat/mb/testing" +) + +func TestData(t *testing.T) { + f := mbtest.NewReportingMetricSetV2(t, getConfig()) + events, errs := mbtest.ReportingFetchV2(f) + if len(errs) > 0 { + t.Fatalf("received error: %+v", errs[0]) + } + if len(events) == 0 { + t.Fatal("no events were generated") + } + fullEvent := mbtest.StandardizeEvent(f, events[0], core.AddDatasetToEvent) + mbtest.WriteEventToDataJSON(t, fullEvent, "") +} + +func TestFetch(t *testing.T) { + // Consume first event: list of all currently open sockets + ms := mbtest.NewReportingMetricSetV2(t, getConfig()) + events, errs := mbtest.ReportingFetchV2(ms) + if errs != nil { + t.Fatal("fetch", errs) + } + _, err := events[0].RootFields.HasKey("destination.port") + assert.NoError(t, err) + + ln, err := net.Listen("tcp", ":0") + if err != nil { + t.Fatal(err) + } + defer ln.Close() + + addr := ln.Addr().String() + i := strings.LastIndex(addr, ":") + listenerPort, err := strconv.Atoi(addr[i+1:]) + if err != nil { + t.Fatal("failed to get port from addr", addr) + } + + // Consume second event: Socket we just opened + events, errs = mbtest.ReportingFetchV2(ms) + if errs != nil { + t.Fatal("fetch", errs) + } + + var found bool + for _, evt := range events { + port, ok := getRequiredValue("destination.port", evt, t).(int) + if !ok { + t.Fatal("destination.port is not an int") + } + if port != listenerPort { + continue + } + + pid, ok := getRequiredValue("process.pid", evt, t).(int) + if !ok { + t.Fatal("process.pid is not an int") + } + assert.Equal(t, os.Getpid(), pid) + + processName, ok := getRequiredValue("process.name", evt, t).(string) + if !ok { + t.Fatal("process.name is not a string") + } + assert.Equal(t, "socket.test", processName) + + uid, ok := getRequiredValue("user.id", evt, t).(uint32) + if !ok { + t.Fatal("user.uid is not a uint32") + } + assert.EqualValues(t, os.Geteuid(), uid) + + dir, ok := getRequiredValue("network.direction", evt, t).(string) + if !ok { + t.Fatal("network.direction is not a string") + } + assert.Equal(t, "listening", dir) + + found = true + break + } + + assert.True(t, found, "listener not found") +} + +func getRequiredValue(key string, mbEvent mb.Event, t testing.TB) interface{} { + v, err := mbEvent.RootFields.GetValue(key) + if err != nil { + t.Fatalf("err=%v, key=%v, event=%v", key, err, mbEvent) + } + if v == nil { + t.Fatalf("key %v not found in %v", key, mbEvent) + } + return v +} + +func getConfig() map[string]interface{} { + return map[string]interface{}{ + "module": "system", + "metricsets": []string{"socket"}, + } +} diff --git a/x-pack/auditbeat/tests/system/test_metricsets.py b/x-pack/auditbeat/tests/system/test_metricsets.py index 2c035a111339..2cc220105bf5 100644 --- a/x-pack/auditbeat/tests/system/test_metricsets.py +++ b/x-pack/auditbeat/tests/system/test_metricsets.py @@ -44,6 +44,22 @@ def test_metricset_processes(self): # Metricset is experimental and that generates a warning, TODO: remove later self.check_metricset("system", "processes", COMMON_FIELDS + fields, warnings_allowed=True) + @unittest.skipUnless(sys.platform == "linux2", "Only implemented for Linux") + def test_metricset_socket(self): + """ + socket metricset collects information about open sockets on a system. + """ + + fields = ["destination.port"] + + # Metricset is experimental and that generates a warning, TODO: remove later + # TODO: Remove try/catch once `network.type` is in fields.ecs.yml + try: + self.check_metricset("system", "socket", COMMON_FIELDS + fields, warnings_allowed=True) + except Exception as e: + if "network.type" not in str(e): + raise + def test_metricset_user(self): """ user metricset collects information about users on a server.