Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
5b0d079
Change to xxhash.
Sep 28, 2018
04f0189
Add sockets metricset and users info.
Oct 19, 2018
be4f900
Error handling
Oct 23, 2018
a9f0a27
Add enrichment with RPC services to sockets.
Oct 23, 2018
9e3fc9a
Addd diffs to sockets.
Oct 24, 2018
9355415
Unify status field values.
Oct 24, 2018
e68cc07
Fix import order.
Oct 25, 2018
78a3ab6
Roll back changes to connection in socket.go
Oct 25, 2018
22e9b72
Remove dublicate import.
Oct 25, 2018
3a37b66
Correct sockets config.
Oct 25, 2018
956a840
Small fixes.
Oct 26, 2018
2e81e6d
data.json for sockets metricset.
Oct 26, 2018
1db2569
Fix import order.
Oct 29, 2018
096e2e2
Working RPC enrichment.
Oct 29, 2018
10466bf
Remove RPC enrichment.
Oct 29, 2018
09df2aa
Unit test for sockets.
Oct 29, 2018
f94e1bd
System test for sockets.
Oct 29, 2018
5013d74
Update fields.yml.
Oct 30, 2018
2c1d9e3
Small fixes.
Oct 30, 2018
366ca7e
Move user information to separate PR.
Oct 30, 2018
f3e978a
Add comments to NetlinkSession.
Oct 30, 2018
7a52059
Rename metricset to socket.
Nov 14, 2018
8030aa4
Move reused Metricbeat functionality to helper package.
Nov 15, 2018
f7005ff
Revert unrelated changes.
Nov 15, 2018
611f49a
Metricset factory for non-Linux systems.
Nov 15, 2018
abc8a9a
Switch to single documents.
Nov 15, 2018
f31813b
Add top-level ECS process information.
Nov 15, 2018
6910430
Change to PushMetricSetV2.
Nov 15, 2018
e8c1203
Working netlink subscription.
Nov 15, 2018
b117a37
Switch back to Fetch metricset.
Nov 21, 2018
3a5817c
ECS fields.
Nov 21, 2018
1ed02be
Update fields.yml.
Nov 21, 2018
627881b
ECS field values for network.direction.
Nov 21, 2018
b32fc7a
Be lenient with errors during ptable refresh.
Nov 22, 2018
a6884ad
Remove TCP state.
Nov 30, 2018
2d0e0db
Change to top-level ECS fields only.
Nov 30, 2018
8a98827
Remove bucket, report state every 1 hour.
Nov 30, 2018
27b91b9
Back out network.type from fields.ecs.yml and skip in test.
Nov 30, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions metricbeat/helper/socket/netlink.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

// +build linux

package socket

import (
"os"
"sync/atomic"

"github.com/pkg/errors"

"github.com/elastic/gosigar/sys/linux"
)

// NetlinkSession communicates with the kernel's netlink subsystem.
type NetlinkSession struct {
readBuffer []byte
seq uint32
}

// NewNetlinkSession creates a new netlink session.
func NewNetlinkSession() *NetlinkSession {
return &NetlinkSession{
readBuffer: make([]byte, os.Getpagesize()),
}
}

// GetSocketList retrieves the current list of sockets from the kernel.
func (session *NetlinkSession) GetSocketList() ([]*linux.InetDiagMsg, error) {
// Send request over netlink and parse responses.
req := linux.NewInetDiagReq()
req.Header.Seq = atomic.AddUint32(&session.seq, 1)
sockets, err := linux.NetlinkInetDiagWithBuf(req, session.readBuffer, nil)
if err != nil {
return nil, errors.Wrap(err, "failed requesting socket dump")
}
return sockets, nil
}
28 changes: 12 additions & 16 deletions metricbeat/module/system/socket/socket.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,17 @@ import (
"net"
"os"
"path/filepath"
"sync/atomic"
"syscall"

"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
sock "github.com/elastic/beats/metricbeat/helper/socket"
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/mb/parse"
"github.com/elastic/beats/metricbeat/module/system"
"github.com/elastic/gosigar/sys/linux"

"github.com/pkg/errors"
)

var (
Expand All @@ -50,14 +50,13 @@ func init() {

type MetricSet struct {
mb.BaseMetricSet
readBuffer []byte
seq uint32
ptable *ProcTable
netlink *sock.NetlinkSession
ptable *sock.ProcTable
euid int
previousConns hashSet
currentConns hashSet
reverseLookup *ReverseLookupCache
listeners *ListenerTable
listeners *sock.ListenerTable
users UserCache
}

Expand All @@ -72,7 +71,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
return nil, errors.New("unexpected module type")
}

ptable, err := NewProcTable(filepath.Join(systemModule.HostFS, "/proc"))
ptable, err := sock.NewProcTable(filepath.Join(systemModule.HostFS, "/proc"))
if err != nil {
return nil, err
}
Expand All @@ -83,12 +82,12 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {

m := &MetricSet{
BaseMetricSet: base,
readBuffer: make([]byte, os.Getpagesize()),
netlink: sock.NewNetlinkSession(),
ptable: ptable,
euid: os.Geteuid(),
previousConns: hashSet{},
currentConns: hashSet{},
listeners: NewListenerTable(),
listeners: sock.NewListenerTable(),
users: NewUserCache(),
}

Expand All @@ -114,10 +113,7 @@ func (m *MetricSet) Fetch() ([]common.MapStr, error) {
debugf("process table refresh had failures: %v", err)
}

// Send request over netlink and parse responses.
req := linux.NewInetDiagReq()
req.Header.Seq = atomic.AddUint32(&m.seq, 1)
sockets, err := linux.NetlinkInetDiagWithBuf(req, m.readBuffer, nil)
sockets, err := m.netlink.GetSocketList()
if err != nil {
return nil, errors.Wrap(err, "failed requesting socket dump")
}
Expand Down Expand Up @@ -192,7 +188,7 @@ func (m *MetricSet) enrichConnectionData(c *connection) {
c.LocalIP, c.LocalPort, c.RemoteIP, c.RemotePort)

// Reverse DNS lookup on the remote IP.
if m.reverseLookup != nil && c.Direction != Listening {
if m.reverseLookup != nil && c.Direction != sock.Listening {
hostname, err := m.reverseLookup.Lookup(c.RemoteIP)
if err != nil {
c.DestHostError = err
Expand Down Expand Up @@ -227,7 +223,7 @@ type connection struct {
RemotePort int

State linux.TCPState
Direction Direction
Direction sock.Direction

DestHost string // Reverse lookup of dest IP.
DestHostETLDPlusOne string
Expand Down
1 change: 1 addition & 0 deletions x-pack/auditbeat/include/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,6 @@ import (
_ "github.com/elastic/beats/x-pack/auditbeat/module/system/host"
_ "github.com/elastic/beats/x-pack/auditbeat/module/system/packages"
_ "github.com/elastic/beats/x-pack/auditbeat/module/system/processes"
_ "github.com/elastic/beats/x-pack/auditbeat/module/system/socket"
_ "github.com/elastic/beats/x-pack/auditbeat/module/system/user"
)
8 changes: 3 additions & 5 deletions x-pack/auditbeat/module/system/_meta/config.yml.tmpl
Original file line number Diff line number Diff line change
@@ -1,22 +1,20 @@
{{ if .Reference -}}
{{ end -}}
{{ if ne .GOOS "windows" -}}
- module: system

metricsets:
- host
- packages
- processes
{{ if eq .GOOS "linux" -}}
- socket
- user
{{- end }}

state.period: 12h

report_changes: true

{{ if eq .GOOS "darwin" -}}
{{ else if eq .GOOS "windows" -}}
{{ else -}}
{{- end }}
{{- end }}
{{ if .Reference }}
{{- end }}
33 changes: 33 additions & 0 deletions x-pack/auditbeat/module/system/socket/_meta/data.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
{
"@timestamp": "2017-10-12T08:05:34.853Z",
"agent": {
"hostname": "host.example.com",
"name": "host.example.com"
},
"user": {
"name": "vagrant",
"id": 1000
},
"process": {
"pid": 4021,
"name": "lynx"
},
"source": {
"ip": "10.0.2.15",
"port": 33956
},
"destination": {
"port": 443,
"ip": "52.10.168.186"
},
"event": {
"type": "event",
"action": "socket_opened",
"module": "system",
"dataset": "socket"
},
"network": {
"type": "ipv4",
"direction": "outbound"
}
}
8 changes: 8 additions & 0 deletions x-pack/auditbeat/module/system/socket/_meta/docs.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
The System `socket` metricset provides ... TODO.

The module is implemented for Linux only.

[float]
=== Configuration options

TODO
31 changes: 31 additions & 0 deletions x-pack/auditbeat/module/system/socket/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package socket

import (
"time"
)

// Config defines the socket metricset's configuration options.
type Config struct {
StatePeriod time.Duration `config:"state.period"`
SocketStatePeriod time.Duration `config:"socket.state.period"`
}

// Validate validates the host metricset config.
func (c *Config) Validate() error {
return nil
}

func (c *Config) effectiveStatePeriod() time.Duration {
if c.SocketStatePeriod != 0 {
return c.SocketStatePeriod
}
return c.StatePeriod
}

var defaultConfig = Config{
StatePeriod: 1 * time.Hour,
}
Loading