Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d

*Packetbeat*

- Prevent incorrect use of AMQP protocol parsing from causing silent failure. {pull}29017[29017]
- Fix error handling in MongoDB protocol parsing. {pull}29017[29017]

*Winlogbeat*

Expand Down
2 changes: 1 addition & 1 deletion packetbeat/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ var RootCmd *cmd.BeatsRootCmd

// PacketbeatSettings contains the default settings for packetbeat
func PacketbeatSettings() instance.Settings {
var runFlags = pflag.NewFlagSet(Name, pflag.ExitOnError)
runFlags := pflag.NewFlagSet(Name, pflag.ExitOnError)
runFlags.AddGoFlag(flag.CommandLine.Lookup("I"))
runFlags.AddGoFlag(flag.CommandLine.Lookup("t"))
runFlags.AddGoFlag(flag.CommandLine.Lookup("O"))
Expand Down
4 changes: 2 additions & 2 deletions packetbeat/config/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (i agentInput) addProcessorsAndIndex(cfg *common.Config) (*common.Config, e
mergeConfig, err := common.NewConfigFrom(common.MapStr{
"index": datastreamConfig.Datastream.Type + "-" + datastreamConfig.Datastream.Dataset + "-" + namespace,
"processors": append([]common.MapStr{
common.MapStr{
{
"add_fields": common.MapStr{
"target": "data_stream",
"fields": common.MapStr{
Expand All @@ -76,7 +76,7 @@ func (i agentInput) addProcessorsAndIndex(cfg *common.Config) (*common.Config, e
},
},
},
common.MapStr{
{
"add_fields": common.MapStr{
"target": "event",
"fields": common.MapStr{
Expand Down
3 changes: 2 additions & 1 deletion packetbeat/decoder/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ func New(
d := Decoder{
flows: f,
decoders: make(map[gopacket.LayerType]gopacket.DecodingLayer),
icmp4Proc: icmp4, icmp6Proc: icmp6, tcpProc: tcp, udpProc: udp}
icmp4Proc: icmp4, icmp6Proc: icmp6, tcpProc: tcp, udpProc: udp,
}
d.stD1Q.init(&d.d1q[0], &d.d1q[1])
d.stIP4.init(&d.ip4[0], &d.ip4[1])
d.stIP6.init(&d.ip6[0], &d.ip6[1])
Expand Down
6 changes: 5 additions & 1 deletion packetbeat/flows/flows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,16 @@ func TestFlowsCounting(t *testing.T) {
assert.NoError(t, err)

uint1, err := module.NewUint("uint1")
assert.NoError(t, err)
uint2, err := module.NewUint("uint2")
assert.NoError(t, err)
int1, err := module.NewInt("int1")
assert.NoError(t, err)
int2, err := module.NewInt("int2")
assert.NoError(t, err)
float1, err := module.NewFloat("float1")
assert.NoError(t, err)
float2, err := module.NewFloat("float2")

assert.NoError(t, err)

pub := &flowsChan{make(chan []beat.Event, 1)}
Expand Down
2 changes: 1 addition & 1 deletion packetbeat/flows/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func makeWorker(
if align > 0 {
// round time to nearest 10 seconds for alignment
aligned := time.Unix(((time.Now().Unix()+(align-1))/align)*align, 0)
waitStart := aligned.Sub(time.Now())
waitStart := time.Until(aligned)
debugf("worker wait start(%v): %v", aligned, waitStart)
if cont := w.sleep(waitStart); !cont {
return
Expand Down
8 changes: 3 additions & 5 deletions packetbeat/flows/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,8 @@ import (
"github.com/elastic/beats/v7/packetbeat/procs"
)

var (
// Use `go test -data` to update sample event files.
dataFlag = flag.Bool("data", false, "Write updated data.json files")
)
// Use `go test -data` to update sample event files.
var dataFlag = flag.Bool("data", false, "Write updated data.json files")

func TestCreateEvent(t *testing.T) {
logp.TestingSetup()
Expand Down Expand Up @@ -124,7 +122,7 @@ func TestCreateEvent(t *testing.T) {
t.Fatal(err)
}

if err := ioutil.WriteFile("../_meta/sample_outputs/flow.json", output, 0644); err != nil {
if err := ioutil.WriteFile("../_meta/sample_outputs/flow.json", output, 0o644); err != nil {
t.Fatal(err)
}
}
Expand Down
10 changes: 5 additions & 5 deletions packetbeat/pb/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,14 +237,14 @@ func (f *Fields) ComputeValues(localIPs []net.IP, internalNetworks []string) err
}

// network.community_id
switch {
case f.Network.Transport == "udp":
switch f.Network.Transport {
case "udp":
flow.Protocol = 17
case f.Network.Transport == "tcp":
case "tcp":
flow.Protocol = 6
case f.Network.Transport == "icmp":
case "icmp":
flow.Protocol = 1
case f.Network.Transport == "ipv6-icmp":
case "ipv6-icmp":
flow.Protocol = 58
}
flow.ICMP.Type = f.ICMPType
Expand Down
4 changes: 2 additions & 2 deletions packetbeat/processor/add_kubernetes_metadata/indexers.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@ func init() {
// Register default indexers
cfg := common.NewConfig()

//Add IP Port Indexer as a default indexer
// Add IP Port Indexer as a default indexer
kubernetes.Indexing.AddDefaultIndexerConfig(kubernetes.IPPortIndexerName, *cfg)

formatCfg, err := common.NewConfigFrom(map[string]interface{}{
"format": "%{[ip]}:%{[port]}",
})
if err == nil {
//Add field matcher with field to lookup as metricset.host
// Add field matcher with field to lookup as metricset.host
kubernetes.Indexing.AddDefaultMatcherConfig(kubernetes.FieldFormatMatcherName, *formatCfg)
}
}
2 changes: 1 addition & 1 deletion packetbeat/procs/procs.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func (proc *ProcessesWatcher) updateMap(transport applayer.Transport) {
if logp.HasSelector("procsdetailed") {
start := time.Now()
defer func() {
logp.Debug("procsdetailed", "updateMap() took %v", time.Now().Sub(start))
logp.Debug("procsdetailed", "updateMap() took %v", time.Since(start))
}()
}

Expand Down
4 changes: 2 additions & 2 deletions packetbeat/procs/procs_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,14 @@ func createFakeDirectoryStructure(prefix string, files []testProcFile) error {
var err error
for _, file := range files {
dir := filepath.Dir(file.path)
err = os.MkdirAll(filepath.Join(prefix, dir), 0755)
err = os.MkdirAll(filepath.Join(prefix, dir), 0o755)
if err != nil {
return err
}

if !file.isLink {
err = ioutil.WriteFile(filepath.Join(prefix, file.path),
[]byte(file.contents), 0644)
[]byte(file.contents), 0o644)
if err != nil {
return err
}
Expand Down
16 changes: 10 additions & 6 deletions packetbeat/procs/procs_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,17 @@ type extractor interface {
Size() int
}

type callbackFn func(net.IP, uint16, int)
type extractorFactory func(fn callbackFn) extractor
type (
callbackFn func(net.IP, uint16, int)
extractorFactory func(fn callbackFn) extractor
)

type tcpRowOwnerPIDExtractor callbackFn
type tcp6RowOwnerPIDExtractor callbackFn
type udpRowOwnerPIDExtractor callbackFn
type udp6RowOwnerPIDExtractor callbackFn
type (
tcpRowOwnerPIDExtractor callbackFn
tcp6RowOwnerPIDExtractor callbackFn
udpRowOwnerPIDExtractor callbackFn
udp6RowOwnerPIDExtractor callbackFn
)

var tablesByTransport = map[applayer.Transport][]struct {
family uint32
Expand Down
38 changes: 26 additions & 12 deletions packetbeat/procs/procs_windows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,21 +42,33 @@ func TestParseTableRaw(t *testing.T) {
expected []portProcMapping
mustErr bool
}{
{"Empty table IPv4", IPv4,
"00000000", nil, false},
{"Empty table IPv6", IPv6,
"00000000", nil, false},
{"Short table (no length)", IPv4,
"000000", nil, true},
{"Short table (partial entry)", IPv6,
"01000000AAAAAAAAAAAAAAAAAAAA", nil, true},
{"One entry (IPv4)", IPv4,
{
"Empty table IPv4", IPv4,
"00000000", nil, false,
},
{
"Empty table IPv6", IPv6,
"00000000", nil, false,
},
{
"Short table (no length)", IPv4,
"000000", nil, true,
},
{
"Short table (partial entry)", IPv6,
"01000000AAAAAAAAAAAAAAAAAAAA", nil, true,
},
{
"One entry (IPv4)", IPv4,
"01000000" +
"77777777AAAAAAAA12340000BBBBBBBBFFFF0000CCCCCCCC",
[]portProcMapping{
{endpoint: endpoint{address: "170.170.170.170", port: 0x1234}, pid: int(pid)},
}, false},
{"Two entries (IPv6)", IPv6,
},
false,
},
{
"Two entries (IPv6)", IPv6,
"02000000" +
// First entry
"11112222333344445555666677778888F0F0F0F0" +
Expand All @@ -76,7 +88,9 @@ func TestParseTableRaw(t *testing.T) {
[]portProcMapping{
{endpoint: endpoint{address: "1111:2222:3333:4444:5555:6666:7777:8888", port: 0xABCD}, pid: 1},
{endpoint: endpoint{address: "aaaa:aaaa:aaaa:aaaa:aaaa:aaaa:aaaa:aaaa", port: 0}, pid: 0xffff},
}, false},
},
false,
},
} {
msg := fmt.Sprintf("Test case #%d: %s", idx+1, testCase.name)
table, err := hex.DecodeString(testCase.raw)
Expand Down
4 changes: 1 addition & 3 deletions packetbeat/procs/zsyscall_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,7 @@ const (
errnoERROR_IO_PENDING = 997
)

var (
errERROR_IO_PENDING error = syscall.Errno(errnoERROR_IO_PENDING)
)
var errERROR_IO_PENDING error = syscall.Errno(errnoERROR_IO_PENDING)

// errnoErr returns common boxed Errno values, to prevent
// allocations at runtime.
Expand Down
Loading