diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 4b51cab3d3bc..53043aba64b7 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -108,6 +108,7 @@ https://github.com/elastic/beats/compare/v6.6.0...6.x[Check the HEAD diff] - Heartbeat now always downloads the entire body of HTTP endpoints, even if no checks against the body content are declared. This fixes an issue where timing metrics would be incorrect in scenarios where the body wasn't used since the connection would be closed soon after the headers were sent, but before the entire body was. {pull}8894[8894] - `Host` header can now be overridden for HTTP requests sent by Heartbeat monitors. {pull}9148[9516] +- Fix checks for TCP send/receive data {pull}10777[10777] *Journalbeat* diff --git a/heartbeat/monitors/active/tcp/task.go b/heartbeat/monitors/active/tcp/task.go index dbe7bee24657..876b78a92ac0 100644 --- a/heartbeat/monitors/active/tcp/task.go +++ b/heartbeat/monitors/active/tcp/task.go @@ -71,7 +71,8 @@ func pingHost( }, }) if err != nil { - event.PutValue("error", reason.FailValidate(err)) + return reason.MakeValidateError(err) } + return nil } diff --git a/heartbeat/monitors/active/tcp/tcp_test.go b/heartbeat/monitors/active/tcp/tcp_test.go index 59a9c69a462c..efa9745ce55e 100644 --- a/heartbeat/monitors/active/tcp/tcp_test.go +++ b/heartbeat/monitors/active/tcp/tcp_test.go @@ -25,6 +25,7 @@ import ( "net/http/httptest" "net/url" "os" + "strconv" "testing" "github.com/stretchr/testify/require" @@ -38,11 +39,16 @@ import ( ) func testTCPCheck(t *testing.T, host string, port uint16) *beat.Event { - config, err := common.NewConfigFrom(common.MapStr{ + config := common.MapStr{ "hosts": host, "ports": port, "timeout": "1s", - }) + } + return testTCPConfigCheck(t, config, host, port) +} + +func testTCPConfigCheck(t *testing.T, configMap common.MapStr, host string, port uint16) *beat.Event { + config, err := common.NewConfigFrom(configMap) require.NoError(t, err) jobs, endpoints, err := create("tcp", config) @@ -230,3 +236,108 @@ func TestNXDomain(t *testing.T) { event.Fields, ) } + +func TestCheckUp(t *testing.T) { + host, port, ip, closeEcho, err := startEchoServer(t) + require.NoError(t, err) + defer closeEcho() + + configMap := common.MapStr{ + "hosts": host, + "ports": port, + "timeout": "1s", + "check.receive": "echo123", + "check.send": "echo123", + } + + event := testTCPConfigCheck(t, configMap, host, port) + + mapvaltest.Test( + t, + mapval.Strict(mapval.Compose( + tcpMonitorChecks(host, ip, port, "up"), + hbtest.RespondingTCPChecks(port), + mapval.MustCompile(mapval.Map{ + "resolve": mapval.Map{ + "host": "localhost", + "ip": ip, + "rtt.us": mapval.IsDuration, + }, + "tcp": mapval.Map{ + "rtt.validate.us": mapval.IsDuration, + }, + }), + )), + event.Fields, + ) +} + +func TestCheckDown(t *testing.T) { + host, port, ip, closeEcho, err := startEchoServer(t) + require.NoError(t, err) + defer closeEcho() + + configMap := common.MapStr{ + "hosts": host, + "ports": port, + "timeout": "1s", + "check.receive": "BOOM", // should fail + "check.send": "echo123", + } + + event := testTCPConfigCheck(t, configMap, host, port) + + mapvaltest.Test( + t, + mapval.Strict(mapval.Compose( + tcpMonitorChecks(host, ip, port, "down"), + hbtest.RespondingTCPChecks(port), + mapval.MustCompile(mapval.Map{ + "resolve": mapval.Map{ + "host": "localhost", + "ip": ip, + "rtt.us": mapval.IsDuration, + }, + "tcp": mapval.Map{ + "rtt.validate.us": mapval.IsDuration, + }, + "error": mapval.Map{ + "type": "validate", + "message": "received string mismatch", + }, + }), + )), + event.Fields, + ) +} + +// startEchoServer starts a simple TCP echo server for testing. Only handles a single connection once. +// Note you MUST connect to this server exactly once to avoid leaking a goroutine. This is only useful +// for the specific tests used here. +func startEchoServer(t *testing.T) (host string, port uint16, ip string, close func() error, err error) { + // Simple echo server + listener, err := net.Listen("tcp", "localhost:0") + if err != nil { + return "", 0, "", nil, err + } + go func() { + conn, err := listener.Accept() + require.NoError(t, err) + buf := make([]byte, 1024) + rlen, err := conn.Read(buf) + require.NoError(t, err) + wlen, err := conn.Write(buf[:rlen]) + require.NoError(t, err) + // Normally we'd retry partial writes, but for tests this is OK + require.Equal(t, wlen, rlen) + }() + + ip, portStr, err := net.SplitHostPort(listener.Addr().String()) + portUint64, err := strconv.ParseUint(portStr, 10, 16) + if err != nil { + listener.Close() + return "", 0, "", nil, err + } + + return "localhost", uint16(portUint64), ip, listener.Close, nil +} diff --git a/heartbeat/reason/reason.go b/heartbeat/reason/reason.go index 08c7724f0162..d5fc5e456898 100644 --- a/heartbeat/reason/reason.go +++ b/heartbeat/reason/reason.go @@ -68,4 +68,7 @@ func Fail(r Reason) common.MapStr { func FailIO(err error) common.MapStr { return Fail(IOError{err}) } -func FailValidate(err error) common.MapStr { return Fail(ValidateError{err}) } +// MakeValidateError creates an instance of ValidateError from the given error. +func MakeValidateError(err error) ValidateError { + return ValidateError{err} +}