Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
324 changes: 321 additions & 3 deletions x-pack/filebeat/tests/integration/otel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,16 @@
http.port: %d
`

<<<<<<< HEAD

Check failure on line 57 in x-pack/filebeat/tests/integration/otel_test.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

syntax error: non-declaration statement outside function body
func TestFilebeatOTelE2E(t *testing.T) {
integration.EnsureESIsRunning(t)
numEvents := 1

=======

Check failure on line 62 in x-pack/filebeat/tests/integration/otel_test.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

syntax error: unexpected ==, expected }
namespace := strings.ReplaceAll(uuid.Must(uuid.NewV4()).String(), "-", "")
fbOtelIndex := "logs-integration-" + namespace
fbIndex := "logs-filebeat-" + namespace
>>>>>>> ea98564a4 (otel: update filebeat integration tests to run in isolation (#45314))

Check failure on line 66 in x-pack/filebeat/tests/integration/otel_test.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

invalid character U+0023 '#'
// start filebeat in otel mode
filebeatOTel := integration.NewBeat(
t,
Expand All @@ -67,7 +73,7 @@
)

logFilePath := filepath.Join(filebeatOTel.TempDir(), "log.log")
filebeatOTel.WriteConfigFile(fmt.Sprintf(beatsCfgFile, logFilePath, "logs-integration-default", 5066))
filebeatOTel.WriteConfigFile(fmt.Sprintf(beatsCfgFile, logFilePath, fbOtelIndex, 5066))
writeEventsToLogFile(t, logFilePath, numEvents)
filebeatOTel.Start()

Expand All @@ -79,11 +85,15 @@
)
logFilePath = filepath.Join(filebeat.TempDir(), "log.log")
writeEventsToLogFile(t, logFilePath, numEvents)
<<<<<<< HEAD
s := fmt.Sprintf(beatsCfgFile, logFilePath, "logs-filebeat-default", 5067)
s = s + `
setup.template.name: logs-filebeat-default
setup.template.pattern: logs-filebeat-default
`
=======
s := fmt.Sprintf(beatsCfgFile, logFilePath, fbIndex, 5067)
>>>>>>> ea98564a4 (otel: update filebeat integration tests to run in isolation (#45314))

Check failure on line 96 in x-pack/filebeat/tests/integration/otel_test.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

invalid character U+0023 '#'

filebeat.WriteConfigFile(s)
filebeat.Start()
Expand All @@ -105,15 +115,15 @@
var filebeatDocs estools.Documents
var otelDocs estools.Documents
// wait for logs to be published
require.Eventually(t,

Check failure on line 118 in x-pack/filebeat/tests/integration/otel_test.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

syntax error: non-declaration statement outside function body
func() bool {

Check failure on line 119 in x-pack/filebeat/tests/integration/otel_test.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

syntax error: unexpected {, expected (

Check failure on line 119 in x-pack/filebeat/tests/integration/otel_test.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

method has no receiver
findCtx, findCancel := context.WithTimeout(context.Background(), 10*time.Second)

Check failure on line 120 in x-pack/filebeat/tests/integration/otel_test.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

syntax error: unexpected := in parameter list; possibly missing comma or )
defer findCancel()

otelDocs, err = estools.GetAllLogsForIndexWithContext(findCtx, es, ".ds-logs-integration-default*")
otelDocs, err = estools.GetAllLogsForIndexWithContext(findCtx, es, ".ds-"+fbOtelIndex+"*")
require.NoError(t, err)

filebeatDocs, err = estools.GetAllLogsForIndexWithContext(findCtx, es, ".ds-logs-filebeat-default*")
filebeatDocs, err = estools.GetAllLogsForIndexWithContext(findCtx, es, ".ds-"+fbIndex+"*")
require.NoError(t, err)

return otelDocs.Hits.Total.Value >= numEvents && filebeatDocs.Hits.Total.Value >= numEvents
Expand Down Expand Up @@ -188,3 +198,311 @@
require.NoError(t, err)
require.Equal(t, http.StatusNotFound, r.StatusCode, "incorrect status code")
}
<<<<<<< HEAD

Check failure on line 201 in x-pack/filebeat/tests/integration/otel_test.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

syntax error: non-declaration statement outside function body
=======

func TestFilebeatOTelReceiverE2E(t *testing.T) {
integration.EnsureESIsRunning(t)
wantEvents := 1

// start filebeat in otel mode
filebeatOTel := integration.NewBeat(
t,
"filebeat-otel",
"../../filebeat.test",
"otel",
)

namespace := strings.ReplaceAll(uuid.Must(uuid.NewV4()).String(), "-", "")
fbReceiverIndex := "logs-integration-" + namespace
filebeatIndex := "logs-filebeat-" + namespace

otelConfig := struct {
Index string
MonitoringPort int
InputFile string
PathHome string
}{
Index: fbReceiverIndex,
MonitoringPort: 5066,
InputFile: filepath.Join(filebeatOTel.TempDir(), "log.log"),
PathHome: filebeatOTel.TempDir(),
}

cfg := `receivers:
filebeatreceiver:
filebeat:
inputs:
- type: filestream
id: filestream-fbreceiver
enabled: true
paths:
- {{.InputFile}}
prospector.scanner.fingerprint.enabled: false
file_identity.native: ~
output:
otelconsumer:
logging:
level: info
selectors:
- '*'
queue.mem.flush.timeout: 0s
path.home: {{.PathHome}}
http.enabled: true
http.host: localhost
http.port: {{.MonitoringPort}}
exporters:
debug:
use_internal_logger: false
verbosity: detailed
elasticsearch/log:
endpoints:
- http://localhost:9200
compression: none
user: admin
password: testing
logs_index: {{.Index}}
batcher:
enabled: true
flush_timeout: 1s
mapping:
mode: bodymap
service:
pipelines:
logs:
receivers:
- filebeatreceiver
exporters:
- elasticsearch/log
- debug
`
var configBuffer bytes.Buffer
require.NoError(t,
template.Must(template.New("config").Parse(cfg)).Execute(&configBuffer, otelConfig))
configContents := configBuffer.Bytes()
t.Cleanup(func() {
if t.Failed() {
t.Logf("Config contents:\n%s", configContents)
}
})

filebeatOTel.WriteConfigFile(string(configContents))
writeEventsToLogFile(t, otelConfig.InputFile, wantEvents)
filebeatOTel.Start()
defer filebeatOTel.Stop()

// start filebeat
filebeat := integration.NewBeat(
t,
"filebeat",
"../../filebeat.test",
)

beatsCfgFile := `
filebeat.inputs:
- type: filestream
id: filestream-input-id
enabled: true
file_identity.native: ~
prospector.scanner.fingerprint.enabled: false
paths:
- %s
output:
elasticsearch:
hosts:
- localhost:9200
username: admin
password: testing
index: %s
queue.mem.flush.timeout: 0s
setup.template.enabled: false
processors:
- add_host_metadata: ~
- add_cloud_metadata: ~
- add_docker_metadata: ~
- add_kubernetes_metadata: ~
setup.template.name: logs-filebeat-default
setup.template.pattern: logs-filebeat-default
http.enabled: true
http.host: localhost
http.port: %d
`
logFilePath := filepath.Join(filebeat.TempDir(), "log.log")
writeEventsToLogFile(t, logFilePath, wantEvents)
s := fmt.Sprintf(beatsCfgFile, logFilePath, filebeatIndex, 5067)
filebeat.WriteConfigFile(s)
filebeat.Start()
defer filebeat.Stop()

es := integration.GetESClient(t, "http")

var filebeatDocs estools.Documents
var otelDocs estools.Documents
var err error

// wait for logs to be published
require.Eventuallyf(t,
func() bool {
findCtx, findCancel := context.WithTimeout(context.Background(), 10*time.Second)
defer findCancel()

otelDocs, err = estools.GetAllLogsForIndexWithContext(findCtx, es, ".ds-"+fbReceiverIndex+"*")
require.NoError(t, err)

filebeatDocs, err = estools.GetAllLogsForIndexWithContext(findCtx, es, ".ds-"+filebeatIndex+"*")
require.NoError(t, err)

return otelDocs.Hits.Total.Value >= wantEvents && filebeatDocs.Hits.Total.Value >= wantEvents
},
2*time.Minute, 1*time.Second, "expected at least %d events, got filebeat: %d and otel: %d", wantEvents, filebeatDocs.Hits.Total.Value, otelDocs.Hits.Total.Value)

filebeatDoc := filebeatDocs.Hits.Hits[0].Source
otelDoc := otelDocs.Hits.Hits[0].Source
ignoredFields := []string{
// Expected to change between agentDocs and OtelDocs
"@timestamp",
"agent.ephemeral_id",
"agent.id",
"log.file.inode",
"log.file.path",
}

assertMapsEqual(t, filebeatDoc, otelDoc, ignoredFields, "expected documents to be equal")
assertMonitoring(t, otelConfig.MonitoringPort)
assertMonitoring(t, 5067) // filebeat
}

func TestFilebeatOTelMultipleReceiversE2E(t *testing.T) {
integration.EnsureESIsRunning(t)
wantEvents := 100

// start filebeat in otel mode
filebeatOTel := integration.NewBeat(
t,
"filebeat-otel",
"../../filebeat.test",
"otel",
)

// write events to log file
logFilePath := filepath.Join(filebeatOTel.TempDir(), "log.log")
writeEventsToLogFile(t, logFilePath, wantEvents)

type receiverConfig struct {
MonitoringPort int
InputFile string
PathHome string
}

namespace := strings.ReplaceAll(uuid.Must(uuid.NewV4()).String(), "-", "")
otelConfig := struct {
Index string
Receivers []receiverConfig
}{
Index: "logs-integration-" + namespace,
Receivers: []receiverConfig{
{
MonitoringPort: 5066,
InputFile: logFilePath,
PathHome: filepath.Join(filebeatOTel.TempDir(), "r1"),
},
{
MonitoringPort: 5067,
InputFile: logFilePath,
PathHome: filepath.Join(filebeatOTel.TempDir(), "r2"),
},
},
}

cfg := `receivers:
{{range $i, $receiver := .Receivers}}
filebeatreceiver/{{$i}}:
filebeat:
inputs:
- type: filestream
id: filestream-fbreceiver
enabled: true
paths:
- {{$receiver.InputFile}}
prospector.scanner.fingerprint.enabled: false
file_identity.native: ~
output:
otelconsumer:
logging:
level: info
selectors:
- '*'
queue.mem.flush.timeout: 0s
path.home: {{$receiver.PathHome}}
{{if $receiver.MonitoringPort}}
http.enabled: true
http.host: localhost
http.port: {{$receiver.MonitoringPort}}
{{end}}
{{end}}
exporters:
debug:
use_internal_logger: false
verbosity: detailed
elasticsearch/log:
endpoints:
- http://localhost:9200
compression: none
user: admin
password: testing
logs_index: {{.Index}}
batcher:
enabled: true
flush_timeout: 1s
mapping:
mode: bodymap
service:
pipelines:
logs:
receivers:
{{range $i, $receiver := .Receivers}}
- filebeatreceiver/{{$i}}
{{end}}
exporters:
- debug
- elasticsearch/log
`
var configBuffer bytes.Buffer
require.NoError(t,
template.Must(template.New("config").Parse(cfg)).Execute(&configBuffer, otelConfig))
configContents := configBuffer.Bytes()

t.Cleanup(func() {
if t.Failed() {
t.Logf("Config contents:\n%s", configContents)
}
})

filebeatOTel.WriteConfigFile(string(configContents))
writeEventsToLogFile(t, logFilePath, wantEvents)
filebeatOTel.Start()
defer filebeatOTel.Stop()

es := integration.GetESClient(t, "http")

var otelDocs estools.Documents
var err error

// wait for logs to be published
wantTotalLogs := wantEvents * len(otelConfig.Receivers)
require.Eventuallyf(t,
func() bool {
findCtx, findCancel := context.WithTimeout(t.Context(), 10*time.Second)
defer findCancel()

otelDocs, err = estools.GetAllLogsForIndexWithContext(findCtx, es, ".ds-"+otelConfig.Index+"*")
require.NoError(t, err)

return otelDocs.Hits.Total.Value >= wantTotalLogs
},
2*time.Minute, 100*time.Millisecond, "expected at least %d events, got %d", wantTotalLogs, otelDocs.Hits.Total.Value)
for _, rec := range otelConfig.Receivers {
assertMonitoring(t, rec.MonitoringPort)
}
}
>>>>>>> ea98564a4 (otel: update filebeat integration tests to run in isolation (#45314))

Check failure on line 508 in x-pack/filebeat/tests/integration/otel_test.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

syntax error: non-declaration statement outside function body
Loading