Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Filebeat] Add lumberjack input #32175

Merged
merged 10 commits into from
Aug 19, 2022
18 changes: 18 additions & 0 deletions filebeat/docs/fields.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ grouped in the following categories:
* <<exported-fields-kubernetes-processor>>
* <<exported-fields-log>>
* <<exported-fields-logstash>>
* <<exported-fields-lumberjack>>
* <<exported-fields-microsoft>>
* <<exported-fields-misp>>
* <<exported-fields-mongodb>>
Expand Down Expand Up @@ -87535,6 +87536,23 @@ alias to: event.duration

--

[[exported-fields-lumberjack]]
== Lumberjack fields

Fields from Lumberjack input.



*`lumberjack`*::
+
--
Structured data received in an event sent over the Lumberjack protocol.


type: flattened

--

[[exported-fields-microsoft]]
== Microsoft fields

Expand Down
1 change: 1 addition & 0 deletions x-pack/filebeat/include/list.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions x-pack/filebeat/input/default-inputs/inputs_aix.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/elastic/beats/v7/x-pack/filebeat/input/awss3"
"github.com/elastic/beats/v7/x-pack/filebeat/input/http_endpoint"
"github.com/elastic/beats/v7/x-pack/filebeat/input/httpjson"
"github.com/elastic/beats/v7/x-pack/filebeat/input/lumberjack"
"github.com/elastic/beats/v7/x-pack/filebeat/input/o365audit"
"github.com/elastic/elastic-agent-libs/logp"
)
Expand All @@ -21,5 +22,6 @@ func xpackInputs(info beat.Info, log *logp.Logger, store beater.StateStore) []v2
httpjson.Plugin(log, store),
o365audit.Plugin(log, store),
awss3.Plugin(store),
lumberjack.Plugin(),
}
}
2 changes: 2 additions & 0 deletions x-pack/filebeat/input/default-inputs/inputs_other.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/elastic/beats/v7/x-pack/filebeat/input/cloudfoundry"
"github.com/elastic/beats/v7/x-pack/filebeat/input/http_endpoint"
"github.com/elastic/beats/v7/x-pack/filebeat/input/httpjson"
"github.com/elastic/beats/v7/x-pack/filebeat/input/lumberjack"
"github.com/elastic/beats/v7/x-pack/filebeat/input/o365audit"
"github.com/elastic/elastic-agent-libs/logp"
)
Expand All @@ -28,5 +29,6 @@ func xpackInputs(info beat.Info, log *logp.Logger, store beater.StateStore) []v2
o365audit.Plugin(log, store),
awss3.Plugin(store),
awscloudwatch.Plugin(store),
lumberjack.Plugin(),
}
}
9 changes: 9 additions & 0 deletions x-pack/filebeat/input/lumberjack/_meta/fields.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
- key: lumberjack
title: "Lumberjack"
description: >
Fields from Lumberjack input.
fields:
- name: lumberjack
type: flattened
description: >
Structured data received in an event sent over the Lumberjack protocol.
78 changes: 78 additions & 0 deletions x-pack/filebeat/input/lumberjack/ack.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package lumberjack

import (
"sync"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common/acker"
)

// batchACKTracker invokes batchACK when all events associated to the batch
// have been published and acknowledged by an output.
type batchACKTracker struct {
batchACK func()

mutex sync.Mutex // mutex synchronizes access to pendingACKs.
pendingACKs int64 // Number of Beat events in lumberjack batch that are pending ACKs.
}

// newBatchACKTracker returns a new batchACKTracker. The provided batchACK function
// is invoked after the full batch has been acknowledged. Ready() must be invoked
// after all events in the batch are published.
func newBatchACKTracker(batchACKCallback func()) *batchACKTracker {
return &batchACKTracker{
batchACK: batchACKCallback,
pendingACKs: 1, // Ready() must be called to consume this "1".
}
}

// Ready signals that the batch has been fully consumed. Only
// after the batch is marked as "ready" can the lumberjack batch
// be ACKed. This prevents the batch from being ACKed prematurely.
func (t *batchACKTracker) Ready() {
t.ACK()
}

// Add increments the number of pending ACKs.
func (t *batchACKTracker) Add() {
t.mutex.Lock()
defer t.mutex.Unlock()

t.pendingACKs++
}

// ACK decrements the number of pending event ACKs. When all pending ACKs are
// received then the lumberjack batch is ACKed.
func (t *batchACKTracker) ACK() {
t.mutex.Lock()
defer t.mutex.Unlock()

if t.pendingACKs <= 0 {
panic("misuse detected: negative ACK counter")
belimawr marked this conversation as resolved.
Show resolved Hide resolved
}

t.pendingACKs--
if t.pendingACKs == 0 {
t.batchACK()
}
}

// newEventACKHandler returns a beat ACKer that can receive callbacks when
// an event has been ACKed an output. If the event contains a private metadata
// pointing to a batchACKTracker then it will invoke the tracker's ACK() method
// to decrement the number of pending ACKs.
func newEventACKHandler() beat.ACKer {
return acker.ConnectionOnly(
acker.EventPrivateReporter(func(_ int, privates []interface{}) {
for _, private := range privates {
if ack, ok := private.(*batchACKTracker); ok {
ack.ACK()
}
}
}),
)
}
46 changes: 46 additions & 0 deletions x-pack/filebeat/input/lumberjack/ack_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package lumberjack

import (
"testing"

"github.com/stretchr/testify/require"

"github.com/elastic/go-lumber/lj"
)

func TestBatchACKTracker(t *testing.T) {
t.Run("empty", func(t *testing.T) {
batch := lj.NewBatch(nil)

acker := newBatchACKTracker(batch.ACK)
require.False(t, isACKed(batch))

acker.Ready()
require.True(t, isACKed(batch))
})

t.Run("single_event", func(t *testing.T) {
batch := lj.NewBatch(nil)

acker := newBatchACKTracker(batch.ACK)
acker.Add()
acker.ACK()
require.False(t, isACKed(batch))

acker.Ready()
require.True(t, isACKed(batch))
})
}

func isACKed(batch *lj.Batch) bool {
select {
case <-batch.Await():
return true
default:
return false
}
}
39 changes: 39 additions & 0 deletions x-pack/filebeat/input/lumberjack/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package lumberjack

import (
"fmt"
"strings"
"time"

"github.com/elastic/elastic-agent-libs/transport/tlscommon"
)

type config struct {
ListenAddress string `config:"listen_address" validate:"nonzero"` // Bind address for the server (e.g. address:port). Default to localhost:5044.
Versions []string `config:"versions"` // List of Lumberjack version (e.g. v1, v2).
TLS *tlscommon.ServerConfig `config:"ssl"` // TLS options.
Keepalive time.Duration `config:"keepalive" validate:"min=0"` // Keepalive interval for notifying clients that batches that are not yet ACKed.
Timeout time.Duration `config:"timeout" validate:"min=0"` // Read / write timeouts for Lumberjack server.
MaxConnections int `config:"max_connections" validate:"min=0"` // Maximum number of concurrent connections. Default is 0 which means no limit.
}

func (c *config) InitDefaults() {
c.ListenAddress = "localhost:5044"
c.Versions = []string{"v1", "v2"}
}

func (c *config) Validate() error {
for _, v := range c.Versions {
switch strings.ToLower(v) {
case "v1", "v2":
default:
return fmt.Errorf("invalid lumberjack version %q: allowed values are v1 and v2", v)
}
}

return nil
}
74 changes: 74 additions & 0 deletions x-pack/filebeat/input/lumberjack/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package lumberjack

import (
"testing"

"github.com/stretchr/testify/require"

conf "github.com/elastic/elastic-agent-libs/config"
)

func TestConfig(t *testing.T) {
testCases := []struct {
name string
userConfig map[string]interface{}
expected *config
expectedErr string
}{
{
"defaults",
map[string]interface{}{},
&config{
ListenAddress: "localhost:5044",
Versions: []string{"v1", "v2"},
},
"",
},
{
"validate version",
map[string]interface{}{
"versions": []string{"v3"},
},
nil,
`invalid lumberjack version "v3"`,
},
{
"validate keepalive",
map[string]interface{}{
"keepalive": "-1s",
},
nil,
`requires duration >= 0`,
},
{
"validate max_connections",
map[string]interface{}{
"max_connections": -1,
},
nil,
`requires value >= 0 accessing 'max_connections'`,
},
}

for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
c := conf.MustNewConfigFrom(tc.userConfig)

var ljConf config
err := c.Unpack(&ljConf)

if tc.expectedErr != "" {
require.Error(t, err, "expected error: %s", tc.expectedErr)
require.Contains(t, err.Error(), tc.expectedErr)
return
}

require.Equal(t, *tc.expected, ljConf)
})
}
}
23 changes: 23 additions & 0 deletions x-pack/filebeat/input/lumberjack/fields.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading