Skip to content

Commit

Permalink
Add lumberjack input to Filebeat
Browse files Browse the repository at this point in the history
Add an input for receiving data over the Lumberjack protocol as
defined in https://github.com/elastic/go-lumber.

The raw data is written into the `lumberjack` field which is
mapped as flattened.
  • Loading branch information
andrewkroh committed Jul 7, 2022
1 parent ba5d0cb commit c109b3b
Show file tree
Hide file tree
Showing 15 changed files with 994 additions and 0 deletions.
18 changes: 18 additions & 0 deletions filebeat/docs/fields.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ grouped in the following categories:
* <<exported-fields-kubernetes-processor>>
* <<exported-fields-log>>
* <<exported-fields-logstash>>
* <<exported-fields-lumberjack>>
* <<exported-fields-microsoft>>
* <<exported-fields-misp>>
* <<exported-fields-mongodb>>
Expand Down Expand Up @@ -87535,6 +87536,23 @@ alias to: event.duration

--

[[exported-fields-lumberjack]]
== Lumberjack fields

Fields from Lumberjack input.



*`lumberjack`*::
+
--
Structured data received in an event sent over the Lumberjack protocol.


type: flattened

--

[[exported-fields-microsoft]]
== Microsoft fields

Expand Down
1 change: 1 addition & 0 deletions x-pack/filebeat/include/list.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions x-pack/filebeat/input/default-inputs/inputs_aix.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/elastic/beats/v7/x-pack/filebeat/input/awss3"
"github.com/elastic/beats/v7/x-pack/filebeat/input/http_endpoint"
"github.com/elastic/beats/v7/x-pack/filebeat/input/httpjson"
"github.com/elastic/beats/v7/x-pack/filebeat/input/lumberjack"
"github.com/elastic/beats/v7/x-pack/filebeat/input/o365audit"
"github.com/elastic/elastic-agent-libs/logp"
)
Expand All @@ -21,5 +22,6 @@ func xpackInputs(info beat.Info, log *logp.Logger, store beater.StateStore) []v2
httpjson.Plugin(log, store),
o365audit.Plugin(log, store),
awss3.Plugin(store),
lumberjack.Plugin(),
}
}
2 changes: 2 additions & 0 deletions x-pack/filebeat/input/default-inputs/inputs_other.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/elastic/beats/v7/x-pack/filebeat/input/cloudfoundry"
"github.com/elastic/beats/v7/x-pack/filebeat/input/http_endpoint"
"github.com/elastic/beats/v7/x-pack/filebeat/input/httpjson"
"github.com/elastic/beats/v7/x-pack/filebeat/input/lumberjack"
"github.com/elastic/beats/v7/x-pack/filebeat/input/o365audit"
"github.com/elastic/elastic-agent-libs/logp"
)
Expand All @@ -28,5 +29,6 @@ func xpackInputs(info beat.Info, log *logp.Logger, store beater.StateStore) []v2
o365audit.Plugin(log, store),
awss3.Plugin(store),
awscloudwatch.Plugin(store),
lumberjack.Plugin(),
}
}
9 changes: 9 additions & 0 deletions x-pack/filebeat/input/lumberjack/_meta/fields.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
- key: lumberjack
title: "Lumberjack"
description: >
Fields from Lumberjack input.
fields:
- name: lumberjack
type: flattened
description: >
Structured data received in an event sent over the Lumberjack protocol.
78 changes: 78 additions & 0 deletions x-pack/filebeat/input/lumberjack/ack.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package lumberjack

import (
"sync"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common/acker"
)

// batchACKTracker invokes batchACK when all events associated to the batch
// have been published and acknowledged by an output.
type batchACKTracker struct {
batchACK func()

mutex sync.Mutex // mutex synchronizes access to pendingACKs.
pendingACKs int64 // Number of Beat events in lumberjack batch that are pending ACKs.
}

// newBatchACKTracker returns a new batchACKTracker. The provided batchACK function
// is invoked after the full batch has been acknowledged. Ready() must be invoked
// after all events in the batch are published.
func newBatchACKTracker(batchACKCallback func()) *batchACKTracker {
return &batchACKTracker{
batchACK: batchACKCallback,
pendingACKs: 1, // Ready() must be called to consume this "1".
}
}

// Ready signals that the batch has been fully consumed. Only
// after the batch is marked as "ready" can the lumberjack batch
// be ACKed. This prevents the batch from being ACKed prematurely.
func (t *batchACKTracker) Ready() {
t.ACK()
}

// Add increments the number of pending ACKs.
func (t *batchACKTracker) Add() {
t.mutex.Lock()
defer t.mutex.Unlock()

t.pendingACKs++
}

// ACK decrements the number of pending event ACKs. When all pending ACKs are
// received then the lumberjack batch is ACKed.
func (t *batchACKTracker) ACK() {
t.mutex.Lock()
defer t.mutex.Unlock()

if t.pendingACKs <= 0 {
panic("misuse detected: negative ACK counter")
}

t.pendingACKs--
if t.pendingACKs == 0 {
t.batchACK()
}
}

// newEventACKHandler returns a beat ACKer that can receive callbacks when
// an event has been ACKed an output. If the event contains a private metadata
// pointing to a batchACKTracker then it will invoke the tracker's ACK() method
// to decrement the number of pending ACKs.
func newEventACKHandler() beat.ACKer {
return acker.ConnectionOnly(
acker.EventPrivateReporter(func(_ int, privates []interface{}) {
for _, private := range privates {
if ack, ok := private.(*batchACKTracker); ok {
ack.ACK()
}
}
}),
)
}
46 changes: 46 additions & 0 deletions x-pack/filebeat/input/lumberjack/ack_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package lumberjack

import (
"testing"

"github.com/stretchr/testify/require"

"github.com/elastic/go-lumber/lj"
)

func TestBatchACKTracker(t *testing.T) {
t.Run("empty", func(t *testing.T) {
batch := lj.NewBatch(nil)

acker := newBatchACKTracker(batch.ACK)
require.False(t, isACKed(batch))

acker.Ready()
require.True(t, isACKed(batch))
})

t.Run("single_event", func(t *testing.T) {
batch := lj.NewBatch(nil)

acker := newBatchACKTracker(batch.ACK)
acker.Add()
acker.ACK()
require.False(t, isACKed(batch))

acker.Ready()
require.True(t, isACKed(batch))
})
}

func isACKed(batch *lj.Batch) bool {
select {
case <-batch.Await():
return true
default:
return false
}
}
39 changes: 39 additions & 0 deletions x-pack/filebeat/input/lumberjack/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package lumberjack

import (
"fmt"
"strings"
"time"

"github.com/elastic/elastic-agent-libs/transport/tlscommon"
)

type config struct {
ListenAddress string `config:"listen_address" validate:"nonzero"` // Bind address for the server (e.g. address:port). Default to localhost:5044.
Versions []string `config:"versions"` // List of Lumberjack version (e.g. v1, v2).
TLS *tlscommon.ServerConfig `config:"ssl"` // TLS options.
Keepalive time.Duration `config:"keepalive" validate:"min=0"` // Keepalive interval for notifying clients that batches that are not yet ACKed.
Timeout time.Duration `config:"timeout" validate:"min=0"` // Read / write timeouts for Lumberjack server.
MaxConnections int `config:"max_connections" validate:"min=0"` // Maximum number of concurrent connections. Default is 0 which means no limit.
}

func (c *config) InitDefaults() {
c.ListenAddress = "localhost:5044"
c.Versions = []string{"v1", "v2"}
}

func (c *config) Validate() error {
for _, v := range c.Versions {
switch strings.ToLower(v) {
case "v1", "v2":
default:
return fmt.Errorf("invalid lumberjack version %q: allowed values are v1 and v2", v)
}
}

return nil
}
74 changes: 74 additions & 0 deletions x-pack/filebeat/input/lumberjack/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package lumberjack

import (
"testing"

"github.com/stretchr/testify/require"

conf "github.com/elastic/elastic-agent-libs/config"
)

func TestConfig(t *testing.T) {
testCases := []struct {
name string
userConfig map[string]interface{}
expected *config
expectedErr string
}{
{
"defaults",
map[string]interface{}{},
&config{
ListenAddress: "localhost:5044",
Versions: []string{"v1", "v2"},
},
"",
},
{
"validate version",
map[string]interface{}{
"versions": []string{"v3"},
},
nil,
`invalid lumberjack version "v3"`,
},
{
"validate keepalive",
map[string]interface{}{
"keepalive": "-1s",
},
nil,
`requires duration >= 0`,
},
{
"validate max_connections",
map[string]interface{}{
"max_connections": -1,
},
nil,
`requires value >= 0 accessing 'max_connections'`,
},
}

for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
c := conf.MustNewConfigFrom(tc.userConfig)

var ljConf config
err := c.Unpack(&ljConf)

if tc.expectedErr != "" {
require.Error(t, err, "expected error: %s", tc.expectedErr)
require.Contains(t, err.Error(), tc.expectedErr)
return
}

require.Equal(t, *tc.expected, ljConf)
})
}
}
23 changes: 23 additions & 0 deletions x-pack/filebeat/input/lumberjack/fields.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit c109b3b

Please sign in to comment.