Skip to content

Commit

Permalink
Add simplified integration framework (#42450)
Browse files Browse the repository at this point in the history
Add simplified integration framework

That can be used for running Beats binaries in integration tests.

This framework has a simplified API comparing to the existing one and
uses a more efficient way to search for logs in the output of the command.
  • Loading branch information
rdner authored Feb 11, 2025
1 parent 6d60672 commit a6ab04f
Show file tree
Hide file tree
Showing 8 changed files with 1,524 additions and 0 deletions.
77 changes: 77 additions & 0 deletions filebeat/testing/integration/integration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package integration

import (
"context"
"fmt"
"testing"

"github.com/elastic/beats/v7/libbeat/testing/integration"
)

// EnsureCompiled ensures that Filebeat is compiled and ready
// to run.
func EnsureCompiled(ctx context.Context, t *testing.T) (path string) {
return integration.EnsureCompiled(ctx, t, "filebeat")
}

// Test describes all operations for testing Filebeat
//
// Due to interface composition all Filebeat-specific functions
// must be used first in the call-chain.
type Test interface {
integration.BeatTest
// ExpectEOF sets an expectation that Filebeat will read the given
// files to EOF.
ExpectEOF(...string) Test
}

// TestOptions describes all available options for the test.
type TestOptions struct {
// Config for the Beat written in YAML
Config string
// Args sets additional arguments to pass when running the binary.
Args []string
}

// NewTest creates a new integration test for Filebeat.
func NewTest(t *testing.T, opts TestOptions) Test {
return &test{
BeatTest: integration.NewBeatTest(t, integration.BeatTestOptions{
Beatname: "filebeat",
Config: opts.Config,
Args: opts.Args,
}),
}
}

type test struct {
integration.BeatTest
}

// ExpectEOF implements the Test interface.
func (fbt *test) ExpectEOF(files ...string) Test {
// Ensuring we completely ingest every file
for _, filename := range files {
line := fmt.Sprintf("End of file reached: %s; Backoff now.", filename)
fbt.ExpectOutput(line)
}

return fbt
}
137 changes: 137 additions & 0 deletions filebeat/testing/integration/log_generator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package integration

import (
"encoding/json"
"fmt"
"os"
"path/filepath"
"testing"

uuid "github.com/gofrs/uuid/v5"
)

// LogGenerator used for generating log files
type LogGenerator interface {
// GenerateLine generates a single line for a log file.
// Expected no new line character at the end.
GenerateLine(filename string, index int) string
// FileExtension defines the extension of the new file where
// the generated lines are written.
FileExtension() string
}

// NewPlainTextGenerator creates is a simple plain text generator.
//
// It's using the given message prefix following by the filename
// and the line number, e.g. `filename:128`
func NewPlainTextGenerator(prefix string) LogGenerator {
return plainTextGenerator{
prefix: prefix,
}
}

type plainTextGenerator struct {
prefix string
}

func (g plainTextGenerator) GenerateLine(filename string, index int) string {
return fmt.Sprintf("%s %s:%d", g.prefix, filepath.Base(filename), index)
}

func (g plainTextGenerator) FileExtension() string {
return ".log"
}

// JSONLineGenerator creates a JSON log line generator.
// Forms a JSON object with a message
// prefixed by the given prefix and followed by the filename
// and the line number, e.g. `filename:128`
func NewJSONGenerator(prefix string) LogGenerator {
return jsonGenerator{
prefix: prefix,
}
}

type jsonGenerator struct {
prefix string
}

func (g jsonGenerator) GenerateLine(filename string, index int) string {
message := fmt.Sprintf("%s %s:%d", g.prefix, filepath.Base(filename), index)

line := struct{ Message string }{Message: message}
bytes, _ := json.Marshal(line)
return string(bytes)
}

func (g jsonGenerator) FileExtension() string {
return ".ndjson"
}

// GenerateLogFiles generate given amount of files with given
// amount of lines in them.
//
// Returns the path value to put in the Filebeat configuration and
// filenames for all created files.
func GenerateLogFiles(t *testing.T, files, lines int, generator LogGenerator) (path string, filenames []string) {
t.Logf("generating %d log files with %d lines each...", files, lines)
logsPath := filepath.Join(t.TempDir(), "logs")
err := os.MkdirAll(logsPath, 0777)
if err != nil {
t.Fatalf("failed to create a directory for logs %q: %s", logsPath, err)
return "", nil
}

filenames = make([]string, 0, files)
for i := 0; i < files; i++ {
id, err := uuid.NewV4()
if err != nil {
t.Fatalf("failed to generate a unique filename: %s", err)
return "", nil
}
filename := filepath.Join(logsPath, id.String()+generator.FileExtension())
filenames = append(filenames, filename)
GenerateLogFile(t, filename, lines, generator)
}

t.Logf("finished generating %d log files with %d lines each", files, lines)

return filepath.Join(logsPath, "*"+generator.FileExtension()), filenames
}

// GenerateLogFile generates a single log file with the given full
// filename, amount of lines using the given generator
// to create each line.
func GenerateLogFile(t *testing.T, filename string, lines int, generator LogGenerator) {
file, err := os.Create(filename)
if err != nil {
t.Fatalf("failed to create a log file: %q", filename)
return
}
defer file.Close()
for i := 1; i <= lines; i++ {
line := generator.GenerateLine(filename, i) + "\n"
_, err := file.WriteString(line)
if err != nil {
t.Fatalf("cannot write a generated log line to %s", filename)
return
}
}
}
139 changes: 139 additions & 0 deletions filebeat/testing/integration/sample_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package integration

import (
"context"
"fmt"
"path/filepath"
"testing"
"time"

"github.com/elastic/beats/v7/libbeat/testing/integration"
)

func TestFilebeat(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
EnsureCompiled(ctx, t)

messagePrefix := "sample test message"
fileCount := 5
lineCount := 128

reportOptions := integration.ReportOptions{
PrintLinesOnFail: 10,
}

t.Run("Filebeat starts and ingests files", func(t *testing.T) {
configTemplate := `
filebeat.inputs:
- type: filestream
id: "test-filestream"
paths:
- %s
# we want to check that all messages are ingested
# without using an external service, this is an easy way
output.console:
enabled: true
`
// we can generate any amount of expectations
// they are light-weight
expectIngestedFiles := func(test Test, files []string) {
// ensuring we ingest every line from every file
for _, filename := range files {
for i := 1; i <= lineCount; i++ {
line := fmt.Sprintf("%s %s:%d", messagePrefix, filepath.Base(filename), i)
test.ExpectOutput(line)
}
}
}

t.Run("plain text files", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
defer cancel()

generator := NewPlainTextGenerator(messagePrefix)
path, files := GenerateLogFiles(t, fileCount, lineCount, generator)
config := fmt.Sprintf(configTemplate, path)
test := NewTest(t, TestOptions{
Config: config,
})

expectIngestedFiles(test, files)

test.
// we expect to read all generated files to EOF
ExpectEOF(files...).
WithReportOptions(reportOptions).
// we should observe the start message of the Beat
ExpectStart().
// check that the first and the last line of the file get ingested
Start(ctx).
// wait until all the expectations are met
// or we hit the timeout set by the context
Wait()
})

t.Run("JSON files", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
defer cancel()

generator := NewJSONGenerator(messagePrefix)
path, files := GenerateLogFiles(t, fileCount, lineCount, generator)
config := fmt.Sprintf(configTemplate, path)
test := NewTest(t, TestOptions{
Config: config,
})

expectIngestedFiles(test, files)

test.
ExpectEOF(files...).
WithReportOptions(reportOptions).
ExpectStart().
Start(ctx).
Wait()
})
})

t.Run("Filebeat crashes due to incorrect config", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

// path items are required, this config is invalid
config := `
filebeat.inputs:
- type: filestream
id: "test-filestream"
output.console:
enabled: true
`
test := NewTest(t, TestOptions{
Config: config,
})

test.
WithReportOptions(reportOptions).
ExpectStart().
ExpectOutput("Exiting: Failed to start crawler: starting input failed: error while initializing input: no path is configured").
ExpectStop(1).
Start(ctx).
Wait()
})
}
Loading

0 comments on commit a6ab04f

Please sign in to comment.