Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion heartbeat/hbtest/hbtestutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,17 @@ func TLSChecks(chainIndex, certIndex int, certificate *x509.Certificate) mapval.

// BaseChecks creates a skima.Validator that represents the "monitor" field present
// in all heartbeat events.
// If IP is set to "" this will check that the field is not present
func BaseChecks(ip string, status string, typ string) mapval.Validator {
var ipCheck mapval.IsDef
if len(ip) > 0 {
ipCheck = mapval.IsEqual(ip)
} else {
ipCheck = mapval.Optional(mapval.IsEqual(ip))
}
return mapval.MustCompile(mapval.Map{
"monitor": mapval.Map{
"ip": ip,
"ip": ipCheck,
"duration.us": mapval.IsDuration,
"status": status,
"id": mapval.IsNonEmptyString,
Expand Down
57 changes: 30 additions & 27 deletions heartbeat/monitors/active/dialchain/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
package dialchain

import (
"fmt"
"net"
"strconv"
"net/url"
"time"

"github.com/elastic/beats/heartbeat/monitors"
"github.com/elastic/beats/heartbeat/monitors/jobs"
"github.com/elastic/beats/heartbeat/monitors/wrappers"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/outputs/transport"
)
Expand Down Expand Up @@ -136,59 +138,60 @@ func MakeDialerJobs(
) ([]jobs.Job, error) {
var jobs []jobs.Job
for _, endpoint := range endpoints {
endpointJobs, err := makeEndpointJobs(b, scheme, endpoint, mode, fn)
if err != nil {
return nil, err
for _, port := range endpoint.Ports {
endpointURL, err := url.Parse(fmt.Sprintf("%s://%s:%d", scheme, endpoint.Host, port))
if err != nil {
return nil, err
}
endpointJob, err := makeEndpointJob(b, endpointURL, mode, fn)
if err != nil {
return nil, err
}
jobs = append(jobs, wrappers.WithURLField(endpointURL, endpointJob))
}
jobs = append(jobs, endpointJobs...)

}

return jobs, nil
}

func makeEndpointJobs(
func makeEndpointJob(
b *Builder,
scheme string,
endpoint Endpoint,
endpointURL *url.URL,
mode monitors.IPSettings,
fn func(*beat.Event, transport.Dialer, string) error,
) ([]jobs.Job, error) {
) (jobs.Job, error) {

// Check if SOCKS5 is configured, with relying on the socks5 proxy
// in resolving the actual IP.
// Create one job for every port number configured.
if b.resolveViaSocks5 {
js := make([]jobs.Job, len(endpoint.Ports))
for i, port := range endpoint.Ports {
address := net.JoinHostPort(endpoint.Host, strconv.Itoa(int(port)))
js[i] = jobs.MakeSimpleJob(func(event *beat.Event) error {
return b.Run(event, address, func(event *beat.Event, dialer transport.Dialer) error {
return fn(event, dialer, address)
return wrappers.WithURLField(endpointURL,
jobs.MakeSimpleJob(func(event *beat.Event) error {
hostPort := net.JoinHostPort(endpointURL.Hostname(), endpointURL.Port())
return b.Run(event, hostPort, func(event *beat.Event, dialer transport.Dialer) error {
return fn(event, dialer, hostPort)
})
})
}
return js, nil
})), nil
}

// Create job that first resolves one or multiple IP (depending on
// config.Mode) in order to create one continuation Task per IP.
settings := monitors.MakeHostJobSettings(endpoint.Host, mode)
settings := monitors.MakeHostJobSettings(endpointURL.Hostname(), mode)

job, err := monitors.MakeByHostJob(settings,
monitors.MakePingAllIPPortFactory(endpoint.Ports,
func(event *beat.Event, ip *net.IPAddr, port uint16) error {
monitors.MakePingIPFactory(
func(event *beat.Event, ip *net.IPAddr) error {
// use address from resolved IP
portStr := strconv.Itoa(int(port))
ipAddr := net.JoinHostPort(ip.String(), portStr)
hostAddr := net.JoinHostPort(endpoint.Host, portStr)
ipPort := net.JoinHostPort(ip.String(), endpointURL.Port())
cb := func(event *beat.Event, dialer transport.Dialer) error {
return fn(event, dialer, hostAddr)
return fn(event, dialer, ipPort)
}
err := b.Run(event, ipAddr, cb)
err := b.Run(event, ipPort, cb)
return err
}))
if err != nil {
return nil, err
}
return []jobs.Job{job}, nil
return job, nil
}
9 changes: 0 additions & 9 deletions heartbeat/monitors/active/tcp/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,9 @@ import (
"strconv"
"strings"

"github.com/elastic/beats/heartbeat/eventext"
"github.com/elastic/beats/heartbeat/monitors"
"github.com/elastic/beats/heartbeat/monitors/active/dialchain"
"github.com/elastic/beats/heartbeat/monitors/jobs"
"github.com/elastic/beats/heartbeat/monitors/wrappers"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
Expand Down Expand Up @@ -91,13 +89,6 @@ func create(

epJobs, err := dialchain.MakeDialerJobs(db, scheme, eps, config.Mode,
func(event *beat.Event, dialer transport.Dialer, addr string) error {
u, err := url.Parse(fmt.Sprintf("%s://%s", scheme, addr))
if err != nil {
return err
}

eventext.MergeEventFields(event, common.MapStr{"url": wrappers.URLFields(u)})

return pingHost(event, dialer, addr, timeout, validator)
})
if err != nil {
Expand Down
28 changes: 20 additions & 8 deletions heartbeat/monitors/active/tcp/tcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,6 @@ func setupServer(t *testing.T, serverCreator func(http.Handler) *httptest.Server
return server, port
}

func tcpMonitorChecks(host string, ip string, port uint16, status string) mapval.Validator {
return hbtest.BaseChecks(ip, status, "tcp")
}

func TestUpEndpointJob(t *testing.T) {
server, port := setupServer(t, httptest.NewServer)
defer server.Close()
Expand Down Expand Up @@ -174,7 +170,7 @@ func TestConnectionRefusedEndpointJob(t *testing.T) {
mapval.Test(
t,
mapval.Strict(mapval.Compose(
tcpMonitorChecks(ip, ip, port, "down"),
hbtest.BaseChecks(ip, "down", "tcp"),
hbtest.SummaryChecks(0, 1),
hbtest.SimpleURLChecks(t, "tcp", ip, port),
hbtest.ErrorChecks(dialErr, "io"),
Expand All @@ -192,7 +188,7 @@ func TestUnreachableEndpointJob(t *testing.T) {
mapval.Test(
t,
mapval.Strict(mapval.Compose(
tcpMonitorChecks(ip, ip, port, "down"),
hbtest.BaseChecks(ip, "down", "tcp"),
hbtest.SummaryChecks(0, 1),
hbtest.SimpleURLChecks(t, "tcp", ip, port),
hbtest.ErrorChecks(dialErr, "io"),
Expand All @@ -219,7 +215,7 @@ func TestCheckUp(t *testing.T) {
mapval.Test(
t,
mapval.Strict(mapval.Compose(
tcpMonitorChecks(host, ip, port, "up"),
hbtest.BaseChecks(ip, "up", "tcp"),
hbtest.RespondingTCPChecks(),
hbtest.SimpleURLChecks(t, "tcp", host, port),
hbtest.SummaryChecks(1, 0),
Expand Down Expand Up @@ -255,7 +251,7 @@ func TestCheckDown(t *testing.T) {
mapval.Test(
t,
mapval.Strict(mapval.Compose(
tcpMonitorChecks(host, ip, port, "down"),
hbtest.BaseChecks(ip, "down", "tcp"),
hbtest.RespondingTCPChecks(),
hbtest.SimpleURLChecks(t, "tcp", host, port),
hbtest.SummaryChecks(0, 1),
Expand All @@ -272,6 +268,22 @@ func TestCheckDown(t *testing.T) {
"message": "received string mismatch",
},
}),
)), event.Fields)
}

func TestNXDomainJob(t *testing.T) {
host := "notadomainatallforsure.notadomain.notatldreally"
port := uint16(1234)
event := testTCPCheck(t, host, port)

dialErr := fmt.Sprintf("lookup %s", host)
mapval.Test(
t,
mapval.Strict(mapval.Compose(
hbtest.BaseChecks("", "down", "tcp"),
hbtest.SummaryChecks(0, 1),
hbtest.SimpleURLChecks(t, "tcp", host, port),
hbtest.ErrorChecks(dialErr, "io"),
)),
event.Fields,
)
Expand Down
48 changes: 0 additions & 48 deletions heartbeat/monitors/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,54 +100,6 @@ func MakePingIPFactory(
}
}

// MakePingAllIPFactory wraps a function for building a recursive Task Runner from function callbacks.
func MakePingAllIPFactory(
f func(*net.IPAddr) []func(*beat.Event) error,
) func(*net.IPAddr) jobs.Job {
return func(ip *net.IPAddr) jobs.Job {
cont := f(ip)
switch len(cont) {
case 0:
return emptyTask
case 1:
return MakeSimpleCont(cont[0])
}

tasks := make([]jobs.Job, len(cont))
for i, c := range cont {
tasks[i] = MakeSimpleCont(c)
}
return func(event *beat.Event) ([]jobs.Job, error) {
return tasks, nil
}
}
}

// MakePingAllIPPortFactory builds a set of TaskRunner supporting a set of
// IP/port-pairs.
func MakePingAllIPPortFactory(
ports []uint16,
f func(*beat.Event, *net.IPAddr, uint16) error,
) func(*net.IPAddr) jobs.Job {
if len(ports) == 1 {
port := ports[0]
return MakePingIPFactory(func(event *beat.Event, ip *net.IPAddr) error {
return f(event, ip, port)
})
}

return MakePingAllIPFactory(func(ip *net.IPAddr) []func(event *beat.Event) error {
funcs := make([]func(*beat.Event) error, len(ports))
for i := range ports {
port := ports[i]
funcs[i] = func(event *beat.Event) error {
return f(event, ip, port)
}
}
return funcs
})
}

// MakeByIPJob builds a new Job based on already known IP. Similar to
// MakeByHostJob, the pingFactory will be used to build the tasks run by the job.
//
Expand Down