Skip to content

Commit

Permalink
Cleanup Filebeat code (#1975)
Browse files Browse the repository at this point in the history
* Cleanup variable names in filebeat.yml.j2
* Cleanup spooler
* Unify variable names
  • Loading branch information
ruflin authored and Steffen Siering committed Jul 12, 2016
1 parent 54e25b8 commit f1ed821
Show file tree
Hide file tree
Showing 14 changed files with 188 additions and 190 deletions.
17 changes: 2 additions & 15 deletions filebeat/crawler/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,11 @@ import (
"github.com/elastic/beats/libbeat/logp"
)

/*
The hierarchy for the crawler objects is explained as following
Crawler: Filebeat has one crawler. The crawler is the single point of control
and stores the state. The state is written through the registrar
Prospector: For every FileConfig the crawler starts a prospector
Harvester: For every file found inside the FileConfig, the Prospector starts a Harvester
The harvester send their events to the spooler
The spooler sends the event to the publisher
The publisher writes the state down with the registrar
*/

type Crawler struct {
prospectors []*prospector.Prospector
wg sync.WaitGroup
spooler *spooler.Spooler
prospectorConfigs []*common.Config
spooler *spooler.Spooler
wg sync.WaitGroup
}

func New(spooler *spooler.Spooler, prospectorConfigs []*common.Config) (*Crawler, error) {
Expand Down Expand Up @@ -58,7 +46,6 @@ func (c *Crawler) Start(states file.States) error {

logp.Info("Loading Prospectors completed. Number of prospectors: %v", len(c.prospectors))

c.wg = sync.WaitGroup{}
for i, p := range c.prospectors {
c.wg.Add(1)

Expand Down
2 changes: 1 addition & 1 deletion filebeat/harvester/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ var (
BackoffFactor: 2,
MaxBackoff: 10 * time.Second,
CloseOlder: 1 * time.Hour,
MaxBytes: 10 * (1 << 20), // 10MB
MaxBytes: 10 * humanize.MiByte,
CloseRemoved: false,
CloseRenamed: false,
CloseEOF: false,
Expand Down
18 changes: 7 additions & 11 deletions filebeat/harvester/harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,12 @@ type Harvester struct {
offset int64
state file.State
prospectorChan chan *input.FileEvent
encoding encoding.EncodingFactory
file source.FileSource /* the file being watched */
ExcludeLinesRegexp []*regexp.Regexp
IncludeLinesRegexp []*regexp.Regexp
done chan struct{}
encodingFactory encoding.EncodingFactory
encoding encoding.Encoding
}

func NewHarvester(
Expand All @@ -59,30 +60,25 @@ func NewHarvester(
if err := cfg.Unpack(&h.config); err != nil {
return nil, err
}
if err := h.config.Validate(); err != nil {
return nil, err
}

encoding, ok := encoding.FindEncoding(h.config.Encoding)
if !ok || encoding == nil {
encodingFactory, ok := encoding.FindEncoding(h.config.Encoding)
if !ok || encodingFactory == nil {
return nil, fmt.Errorf("unknown encoding('%v')", h.config.Encoding)
}
h.encoding = encoding
h.encodingFactory = encodingFactory

h.ExcludeLinesRegexp = h.config.ExcludeLines
h.IncludeLinesRegexp = h.config.IncludeLines
return h, nil
}

// open does open the file given under h.Path and assigns the file handler to h.file
func (h *Harvester) open() (encoding.Encoding, error) {
func (h *Harvester) open() error {

switch h.config.InputType {
case config.StdinInputType:
return h.openStdin()
case config.LogInputType:
return h.openFile()
default:
return nil, fmt.Errorf("Invalid input type")
return fmt.Errorf("Invalid input type")
}
}
97 changes: 43 additions & 54 deletions filebeat/harvester/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"io"

"github.com/elastic/beats/filebeat/config"
"github.com/elastic/beats/filebeat/harvester/encoding"
"github.com/elastic/beats/filebeat/harvester/processor"
"github.com/elastic/beats/filebeat/harvester/reader"
"github.com/elastic/beats/filebeat/harvester/source"
Expand All @@ -26,31 +25,15 @@ func (h *Harvester) Harvest() {

h.state.Finished = false

enc, err := h.open()
err := h.open()
if err != nil {
logp.Err("Stop Harvesting. Unexpected file opening error: %s", err)
return
}

logp.Info("Harvester started for file: %s", h.path)

// TODO: NewLineReader uses additional buffering to deal with encoding and testing
// for new lines in input stream. Simple 8-bit based encodings, or plain
// don't require 'complicated' logic.
cfg := h.config
readerConfig := reader.LogFileReaderConfig{
CloseRemoved: cfg.CloseRemoved,
CloseRenamed: cfg.CloseRenamed,
CloseOlder: cfg.CloseOlder,
CloseEOF: cfg.CloseEOF,
BackoffDuration: cfg.Backoff,
MaxBackoffDuration: cfg.MaxBackoff,
BackoffFactor: cfg.BackoffFactor,
}

processor, err := createLineProcessor(
h.file, enc, cfg.BufferSize, cfg.MaxBytes, readerConfig,
cfg.JSON, cfg.Multiline, h.done)
processor, err := h.newLineProcessor()
if err != nil {
logp.Err("Stop Harvesting. Unexpected encoding line reader error: %s", err)
return
Expand All @@ -62,7 +45,6 @@ func (h *Harvester) Harvest() {
}

for {

select {
case <-h.done:
return
Expand Down Expand Up @@ -101,6 +83,7 @@ func (h *Harvester) Harvest() {
}

// Always send event to update state, also if lines was skipped
// Stop harvester in case of an error
if !h.sendEvent(event) {
return
}
Expand Down Expand Up @@ -140,15 +123,15 @@ func (h *Harvester) sendEvent(event *input.FileEvent) bool {
// shouldExportLine decides if the line is exported or not based on
// the include_lines and exclude_lines options.
func (h *Harvester) shouldExportLine(line string) bool {
if len(h.IncludeLinesRegexp) > 0 {
if !MatchAnyRegexps(h.IncludeLinesRegexp, line) {
if len(h.config.IncludeLines) > 0 {
if !MatchAnyRegexps(h.config.IncludeLines, line) {
// drop line
logp.Debug("harvester", "Drop line as it does not match any of the include patterns %s", line)
return false
}
}
if len(h.ExcludeLinesRegexp) > 0 {
if MatchAnyRegexps(h.ExcludeLinesRegexp, line) {
if len(h.config.ExcludeLines) > 0 {
if MatchAnyRegexps(h.config.ExcludeLines, line) {
// drop line
logp.Debug("harvester", "Drop line as it does match one of the exclude patterns%s", line)
return false
Expand All @@ -163,66 +146,63 @@ func (h *Harvester) shouldExportLine(line string) bool {
// or the file cannot be opened because for example of failing read permissions, an error
// is returned and the harvester is closed. The file will be picked up again the next time
// the file system is scanned
func (h *Harvester) openFile() (encoding.Encoding, error) {
var encoding encoding.Encoding
func (h *Harvester) openFile() error {

f, err := file.ReadOpen(h.path)
if err != nil {
logp.Err("Failed opening %s: %s", h.path, err)
return nil, err
return err
}

// Check we are not following a rabbit hole (symlinks, etc.)
if !file.IsRegular(f) {
return nil, errors.New("Given file is not a regular file.")
return errors.New("Given file is not a regular file.")
}

info, err := f.Stat()
if err != nil {
logp.Err("Failed getting stats for file %s: %s", h.path, err)
return nil, err
return err
}
// Compares the stat of the opened file to the state given by the prospector. Abort if not match.
if !os.SameFile(h.state.Fileinfo, info) {
return nil, errors.New("File info is not identical with opened file. Aborting harvesting and retrying file later again.")
return errors.New("File info is not identical with opened file. Aborting harvesting and retrying file later again.")
}

encoding, err = h.encoding(f)
h.encoding, err = h.encodingFactory(f)
if err != nil {

if err == transform.ErrShortSrc {
logp.Info("Initialising encoding for '%v' failed due to file being too short", f)
} else {
logp.Err("Initialising encoding for '%v' failed: %v", f, err)
}
return nil, err
return err
}

// update file offset
err = h.initFileOffset(f)
if err != nil {
return nil, err
return err
}

// yay, open file
h.file = source.File{f}
return encoding, nil
return nil
}

func (h *Harvester) initFileOffset(file *os.File) error {
offset, err := file.Seek(0, os.SEEK_CUR)

if h.getOffset() > 0 {
// continue from last known offset

logp.Debug("harvester",
"harvest: %q position:%d (offset snapshot:%d)", h.path, h.getOffset(), offset)
_, err = file.Seek(h.getOffset(), os.SEEK_SET)
} else if h.config.TailFiles {
// tail file if file is new and tail_files config is set
logp.Debug("harvester", "harvest: (tailing) %q (offset snapshot:%d)", h.path, offset)

logp.Debug("harvester",
"harvest: (tailing) %q (offset snapshot:%d)", h.path, offset)
offset, err = file.Seek(0, os.SEEK_END)
h.SetOffset(offset)

Expand Down Expand Up @@ -290,40 +270,49 @@ func (h *Harvester) close() {
}
}

func createLineProcessor(
in source.FileSource,
codec encoding.Encoding,
bufferSize int,
maxBytes int,
readerConfig reader.LogFileReaderConfig,
jsonConfig *processor.JSONConfig,
mlrConfig *processor.MultilineConfig,
done chan struct{},
) (processor.LineProcessor, error) {
func (h *Harvester) newLogFileReaderConfig() reader.LogFileReaderConfig {
// TODO: NewLineReader uses additional buffering to deal with encoding and testing
// for new lines in input stream. Simple 8-bit based encodings, or plain
// don't require 'complicated' logic.
return reader.LogFileReaderConfig{
CloseRemoved: h.config.CloseRemoved,
CloseRenamed: h.config.CloseRenamed,
CloseOlder: h.config.CloseOlder,
CloseEOF: h.config.CloseEOF,
Backoff: h.config.Backoff,
MaxBackoff: h.config.MaxBackoff,
BackoffFactor: h.config.BackoffFactor,
}
}

func (h *Harvester) newLineProcessor() (processor.LineProcessor, error) {

readerConfig := h.newLogFileReaderConfig()

var p processor.LineProcessor
var err error

fileReader, err := reader.NewLogFileReader(in, readerConfig, done)
fileReader, err := reader.NewLogFileReader(h.file, readerConfig, h.done)
if err != nil {
return nil, err
}

p, err = processor.NewLineEncoder(fileReader, codec, bufferSize)
p, err = processor.NewLineEncoder(fileReader, h.encoding, h.config.BufferSize)
if err != nil {
return nil, err
}

if jsonConfig != nil {
p = processor.NewJSONProcessor(p, jsonConfig)
if h.config.JSON != nil {
p = processor.NewJSONProcessor(p, h.config.JSON)
}

p = processor.NewStripNewline(p)
if mlrConfig != nil {
p, err = processor.NewMultiline(p, "\n", maxBytes, mlrConfig)
if h.config.Multiline != nil {
p, err = processor.NewMultiline(p, "\n", h.config.MaxBytes, h.config.Multiline)
if err != nil {
return nil, err
}
}

return processor.NewLimitProcessor(p, maxBytes), nil
return processor.NewLimitProcessor(p, h.config.MaxBytes), nil
}
32 changes: 22 additions & 10 deletions filebeat/harvester/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,18 +55,30 @@ func TestReadLine(t *testing.T) {
defer readFile.Close()
assert.Nil(t, err)

h := Harvester{}
f := source.File{readFile}

h := Harvester{
config: harvesterConfig{
CloseOlder: 500 * time.Millisecond,
Backoff: 100 * time.Millisecond,
MaxBackoff: 1 * time.Second,
BackoffFactor: 2,
BufferSize: 100,
MaxBytes: 1000,
},
file: f,
}
assert.NotNil(t, h)

// Read only 10 bytes which is not the end of the file
codec, _ := encoding.Plain(file)
readConfig := reader.LogFileReaderConfig{
CloseOlder: 500 * time.Millisecond,
BackoffDuration: 100 * time.Millisecond,
MaxBackoffDuration: 1 * time.Second,
BackoffFactor: 2,
}
r, _ := createLineProcessor(source.File{readFile}, codec, 100, 1000, readConfig, nil, nil, nil)
var ok bool
h.encodingFactory, ok = encoding.FindEncoding(h.config.Encoding)
assert.True(t, ok)

h.encoding, err = h.encodingFactory(readFile)
assert.NoError(t, err)

r, err := h.newLineProcessor()
assert.NoError(t, err)

// Read third line
_, text, bytesread, _, err := readLine(r)
Expand Down
Loading

0 comments on commit f1ed821

Please sign in to comment.