Skip to content

Commit

Permalink
Merge pull request #557 from monicasarbu/fix_ignore_outgoing
Browse files Browse the repository at this point in the history
Use direction to check if it's an outgoing leg
  • Loading branch information
tsg committed Dec 18, 2015
2 parents 96866c7 + 1627ba7 commit 95414f7
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ https://github.com/elastic/beats/compare/1.0.0...master[Check the HEAD diff]
*Affecting all Beats*

*Packetbeat*
- Fix setting direction to out and use its value to decide when dropping events if ignore_outgoing is enabled {pull}557[557]

*Topbeat*

Expand Down
16 changes: 10 additions & 6 deletions libbeat/publisher/preprocess.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,13 @@ func updateEventAddresses(publisher *PublisherType, event common.MapStr) bool {
event["client_proc"] = src.Proc
event["client_server"] = srcServer
delete(event, "src")

// check if it's outgoing transaction (as client)
if publisher.IsPublisherIP(src.Ip) {
//outgoing transaction
event["direction"] = "out"
}

}
dst, ok := event["dst"].(*common.Endpoint)
if ok {
Expand All @@ -150,18 +157,15 @@ func updateEventAddresses(publisher *PublisherType, event common.MapStr) bool {
event["server"] = dstServer
delete(event, "dst")

//get the direction of the transaction: outgoing (as client)/incoming (as server)
//check if it's incoming transaction (as server)
if publisher.IsPublisherIP(dst.Ip) {
// incoming transaction
event["direction"] = "in"
} else {
//outgoing transaction
event["direction"] = "out"
}

}

if publisher.IgnoreOutgoing && dstServer != "" &&
dstServer != publisher.name {
if publisher.IgnoreOutgoing && event["direction"] == "out" {
// duplicated transaction -> ignore it
debug("Ignore duplicated transaction on %s: %s -> %s",
publisher.name, srcServer, dstServer)
Expand Down
82 changes: 82 additions & 0 deletions libbeat/publisher/preprocess_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,85 @@ func TestFilterEvent(t *testing.T) {
assert.Regexp(t, test.err, filterEvent(test.f()))
}
}

func TestDirectionOut(t *testing.T) {
publisher := PublisherType{}

publisher.ipaddrs = []string{"192.145.2.4"}

event := common.MapStr{
"src": &common.Endpoint{
Ip: "192.145.2.4",
Port: 3267,
Name: "server1",
Cmdline: "proc1 start",
Proc: "proc1",
},
"dst": &common.Endpoint{
Ip: "192.145.2.5",
Port: 32232,
Name: "server2",
Cmdline: "proc2 start",
Proc: "proc2",
},
}

assert.True(t, updateEventAddresses(&publisher, event))
assert.True(t, event["client_ip"] == "192.145.2.4")
assert.True(t, event["direction"] == "out")
}

func TestDirectionIn(t *testing.T) {
publisher := PublisherType{}

publisher.ipaddrs = []string{"192.145.2.5"}

event := common.MapStr{
"src": &common.Endpoint{
Ip: "192.145.2.4",
Port: 3267,
Name: "server1",
Cmdline: "proc1 start",
Proc: "proc1",
},
"dst": &common.Endpoint{
Ip: "192.145.2.5",
Port: 32232,
Name: "server2",
Cmdline: "proc2 start",
Proc: "proc2",
},
}

assert.True(t, updateEventAddresses(&publisher, event))
assert.True(t, event["client_ip"] == "192.145.2.4")
assert.True(t, event["direction"] == "in")
}

func TestNoDirection(t *testing.T) {
publisher := PublisherType{}

publisher.ipaddrs = []string{"192.145.2.6"}

event := common.MapStr{
"src": &common.Endpoint{
Ip: "192.145.2.4",
Port: 3267,
Name: "server1",
Cmdline: "proc1 start",
Proc: "proc1",
},
"dst": &common.Endpoint{
Ip: "192.145.2.5",
Port: 32232,
Name: "server2",
Cmdline: "proc2 start",
Proc: "proc2",
},
}

assert.True(t, updateEventAddresses(&publisher, event))
assert.True(t, event["client_ip"] == "192.145.2.4")
_, ok := event["direction"]
assert.False(t, ok)
}

0 comments on commit 95414f7

Please sign in to comment.