diff --git a/heartbeat/hbtest/hbtestutil.go b/heartbeat/hbtest/hbtestutil.go index 0173b6760e69..fca1ab267d3b 100644 --- a/heartbeat/hbtest/hbtestutil.go +++ b/heartbeat/hbtest/hbtestutil.go @@ -97,10 +97,17 @@ func TLSChecks(chainIndex, certIndex int, certificate *x509.Certificate) mapval. // BaseChecks creates a skima.Validator that represents the "monitor" field present // in all heartbeat events. +// If IP is set to "" this will check that the field is not present func BaseChecks(ip string, status string, typ string) mapval.Validator { + var ipCheck mapval.IsDef + if len(ip) > 0 { + ipCheck = mapval.IsEqual(ip) + } else { + ipCheck = mapval.Optional(mapval.IsEqual(ip)) + } return mapval.MustCompile(mapval.Map{ "monitor": mapval.Map{ - "ip": ip, + "ip": ipCheck, "duration.us": mapval.IsDuration, "status": status, "id": mapval.IsNonEmptyString, diff --git a/heartbeat/monitors/active/dialchain/builder.go b/heartbeat/monitors/active/dialchain/builder.go index b8d522bbe1e6..301c932db573 100644 --- a/heartbeat/monitors/active/dialchain/builder.go +++ b/heartbeat/monitors/active/dialchain/builder.go @@ -18,12 +18,14 @@ package dialchain import ( + "fmt" "net" - "strconv" + "net/url" "time" "github.com/elastic/beats/heartbeat/monitors" "github.com/elastic/beats/heartbeat/monitors/jobs" + "github.com/elastic/beats/heartbeat/monitors/wrappers" "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/outputs/transport" ) @@ -136,59 +138,60 @@ func MakeDialerJobs( ) ([]jobs.Job, error) { var jobs []jobs.Job for _, endpoint := range endpoints { - endpointJobs, err := makeEndpointJobs(b, scheme, endpoint, mode, fn) - if err != nil { - return nil, err + for _, port := range endpoint.Ports { + endpointURL, err := url.Parse(fmt.Sprintf("%s://%s:%d", scheme, endpoint.Host, port)) + if err != nil { + return nil, err + } + endpointJob, err := makeEndpointJob(b, endpointURL, mode, fn) + if err != nil { + return nil, err + } + jobs = append(jobs, wrappers.WithURLField(endpointURL, endpointJob)) } - jobs = append(jobs, endpointJobs...) + } return jobs, nil } -func makeEndpointJobs( +func makeEndpointJob( b *Builder, - scheme string, - endpoint Endpoint, + endpointURL *url.URL, mode monitors.IPSettings, fn func(*beat.Event, transport.Dialer, string) error, -) ([]jobs.Job, error) { +) (jobs.Job, error) { // Check if SOCKS5 is configured, with relying on the socks5 proxy // in resolving the actual IP. // Create one job for every port number configured. if b.resolveViaSocks5 { - js := make([]jobs.Job, len(endpoint.Ports)) - for i, port := range endpoint.Ports { - address := net.JoinHostPort(endpoint.Host, strconv.Itoa(int(port))) - js[i] = jobs.MakeSimpleJob(func(event *beat.Event) error { - return b.Run(event, address, func(event *beat.Event, dialer transport.Dialer) error { - return fn(event, dialer, address) + return wrappers.WithURLField(endpointURL, + jobs.MakeSimpleJob(func(event *beat.Event) error { + hostPort := net.JoinHostPort(endpointURL.Hostname(), endpointURL.Port()) + return b.Run(event, hostPort, func(event *beat.Event, dialer transport.Dialer) error { + return fn(event, dialer, hostPort) }) - }) - } - return js, nil + })), nil } // Create job that first resolves one or multiple IP (depending on // config.Mode) in order to create one continuation Task per IP. - settings := monitors.MakeHostJobSettings(endpoint.Host, mode) + settings := monitors.MakeHostJobSettings(endpointURL.Hostname(), mode) job, err := monitors.MakeByHostJob(settings, - monitors.MakePingAllIPPortFactory(endpoint.Ports, - func(event *beat.Event, ip *net.IPAddr, port uint16) error { + monitors.MakePingIPFactory( + func(event *beat.Event, ip *net.IPAddr) error { // use address from resolved IP - portStr := strconv.Itoa(int(port)) - ipAddr := net.JoinHostPort(ip.String(), portStr) - hostAddr := net.JoinHostPort(endpoint.Host, portStr) + ipPort := net.JoinHostPort(ip.String(), endpointURL.Port()) cb := func(event *beat.Event, dialer transport.Dialer) error { - return fn(event, dialer, hostAddr) + return fn(event, dialer, ipPort) } - err := b.Run(event, ipAddr, cb) + err := b.Run(event, ipPort, cb) return err })) if err != nil { return nil, err } - return []jobs.Job{job}, nil + return job, nil } diff --git a/heartbeat/monitors/active/tcp/tcp.go b/heartbeat/monitors/active/tcp/tcp.go index 827d7654a1cd..0fcc4329b9ce 100644 --- a/heartbeat/monitors/active/tcp/tcp.go +++ b/heartbeat/monitors/active/tcp/tcp.go @@ -23,11 +23,9 @@ import ( "strconv" "strings" - "github.com/elastic/beats/heartbeat/eventext" "github.com/elastic/beats/heartbeat/monitors" "github.com/elastic/beats/heartbeat/monitors/active/dialchain" "github.com/elastic/beats/heartbeat/monitors/jobs" - "github.com/elastic/beats/heartbeat/monitors/wrappers" "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" @@ -91,13 +89,6 @@ func create( epJobs, err := dialchain.MakeDialerJobs(db, scheme, eps, config.Mode, func(event *beat.Event, dialer transport.Dialer, addr string) error { - u, err := url.Parse(fmt.Sprintf("%s://%s", scheme, addr)) - if err != nil { - return err - } - - eventext.MergeEventFields(event, common.MapStr{"url": wrappers.URLFields(u)}) - return pingHost(event, dialer, addr, timeout, validator) }) if err != nil { diff --git a/heartbeat/monitors/active/tcp/tcp_test.go b/heartbeat/monitors/active/tcp/tcp_test.go index 3b25685636a4..af3026cbc002 100644 --- a/heartbeat/monitors/active/tcp/tcp_test.go +++ b/heartbeat/monitors/active/tcp/tcp_test.go @@ -97,10 +97,6 @@ func setupServer(t *testing.T, serverCreator func(http.Handler) *httptest.Server return server, port } -func tcpMonitorChecks(host string, ip string, port uint16, status string) mapval.Validator { - return hbtest.BaseChecks(ip, status, "tcp") -} - func TestUpEndpointJob(t *testing.T) { server, port := setupServer(t, httptest.NewServer) defer server.Close() @@ -174,7 +170,7 @@ func TestConnectionRefusedEndpointJob(t *testing.T) { mapval.Test( t, mapval.Strict(mapval.Compose( - tcpMonitorChecks(ip, ip, port, "down"), + hbtest.BaseChecks(ip, "down", "tcp"), hbtest.SummaryChecks(0, 1), hbtest.SimpleURLChecks(t, "tcp", ip, port), hbtest.ErrorChecks(dialErr, "io"), @@ -192,7 +188,7 @@ func TestUnreachableEndpointJob(t *testing.T) { mapval.Test( t, mapval.Strict(mapval.Compose( - tcpMonitorChecks(ip, ip, port, "down"), + hbtest.BaseChecks(ip, "down", "tcp"), hbtest.SummaryChecks(0, 1), hbtest.SimpleURLChecks(t, "tcp", ip, port), hbtest.ErrorChecks(dialErr, "io"), @@ -219,7 +215,7 @@ func TestCheckUp(t *testing.T) { mapval.Test( t, mapval.Strict(mapval.Compose( - tcpMonitorChecks(host, ip, port, "up"), + hbtest.BaseChecks(ip, "up", "tcp"), hbtest.RespondingTCPChecks(), hbtest.SimpleURLChecks(t, "tcp", host, port), hbtest.SummaryChecks(1, 0), @@ -255,7 +251,7 @@ func TestCheckDown(t *testing.T) { mapval.Test( t, mapval.Strict(mapval.Compose( - tcpMonitorChecks(host, ip, port, "down"), + hbtest.BaseChecks(ip, "down", "tcp"), hbtest.RespondingTCPChecks(), hbtest.SimpleURLChecks(t, "tcp", host, port), hbtest.SummaryChecks(0, 1), @@ -272,6 +268,22 @@ func TestCheckDown(t *testing.T) { "message": "received string mismatch", }, }), + )), event.Fields) +} + +func TestNXDomainJob(t *testing.T) { + host := "notadomainatallforsure.notadomain.notatldreally" + port := uint16(1234) + event := testTCPCheck(t, host, port) + + dialErr := fmt.Sprintf("lookup %s", host) + mapval.Test( + t, + mapval.Strict(mapval.Compose( + hbtest.BaseChecks("", "down", "tcp"), + hbtest.SummaryChecks(0, 1), + hbtest.SimpleURLChecks(t, "tcp", host, port), + hbtest.ErrorChecks(dialErr, "io"), )), event.Fields, ) diff --git a/heartbeat/monitors/util.go b/heartbeat/monitors/util.go index 7f81ac8a455a..fa48113ec72a 100644 --- a/heartbeat/monitors/util.go +++ b/heartbeat/monitors/util.go @@ -100,54 +100,6 @@ func MakePingIPFactory( } } -// MakePingAllIPFactory wraps a function for building a recursive Task Runner from function callbacks. -func MakePingAllIPFactory( - f func(*net.IPAddr) []func(*beat.Event) error, -) func(*net.IPAddr) jobs.Job { - return func(ip *net.IPAddr) jobs.Job { - cont := f(ip) - switch len(cont) { - case 0: - return emptyTask - case 1: - return MakeSimpleCont(cont[0]) - } - - tasks := make([]jobs.Job, len(cont)) - for i, c := range cont { - tasks[i] = MakeSimpleCont(c) - } - return func(event *beat.Event) ([]jobs.Job, error) { - return tasks, nil - } - } -} - -// MakePingAllIPPortFactory builds a set of TaskRunner supporting a set of -// IP/port-pairs. -func MakePingAllIPPortFactory( - ports []uint16, - f func(*beat.Event, *net.IPAddr, uint16) error, -) func(*net.IPAddr) jobs.Job { - if len(ports) == 1 { - port := ports[0] - return MakePingIPFactory(func(event *beat.Event, ip *net.IPAddr) error { - return f(event, ip, port) - }) - } - - return MakePingAllIPFactory(func(ip *net.IPAddr) []func(event *beat.Event) error { - funcs := make([]func(*beat.Event) error, len(ports)) - for i := range ports { - port := ports[i] - funcs[i] = func(event *beat.Event) error { - return f(event, ip, port) - } - } - return funcs - }) -} - // MakeByIPJob builds a new Job based on already known IP. Similar to // MakeByHostJob, the pingFactory will be used to build the tasks run by the job. //