diff --git a/filebeat/docs/fields.asciidoc b/filebeat/docs/fields.asciidoc index a75f574429a..05e0ac1c352 100644 --- a/filebeat/docs/fields.asciidoc +++ b/filebeat/docs/fields.asciidoc @@ -56,6 +56,7 @@ grouped in the following categories: * <> * <> * <> +* <> * <> * <> * <> @@ -87535,6 +87536,23 @@ alias to: event.duration -- +[[exported-fields-lumberjack]] +== Lumberjack fields + +Fields from Lumberjack input. + + + +*`lumberjack`*:: ++ +-- +Structured data received in an event sent over the Lumberjack protocol. + + +type: flattened + +-- + [[exported-fields-microsoft]] == Microsoft fields diff --git a/x-pack/filebeat/include/list.go b/x-pack/filebeat/include/list.go index 91bf5fee773..c1382ae3627 100644 --- a/x-pack/filebeat/include/list.go +++ b/x-pack/filebeat/include/list.go @@ -13,6 +13,7 @@ import ( _ "github.com/elastic/beats/v7/x-pack/filebeat/input/azureeventhub" _ "github.com/elastic/beats/v7/x-pack/filebeat/input/cometd" _ "github.com/elastic/beats/v7/x-pack/filebeat/input/gcppubsub" + _ "github.com/elastic/beats/v7/x-pack/filebeat/input/lumberjack" _ "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow" _ "github.com/elastic/beats/v7/x-pack/filebeat/module/activemq" _ "github.com/elastic/beats/v7/x-pack/filebeat/module/aws" diff --git a/x-pack/filebeat/input/default-inputs/inputs_aix.go b/x-pack/filebeat/input/default-inputs/inputs_aix.go index a1cdf5da43e..f46d8ed1f25 100644 --- a/x-pack/filebeat/input/default-inputs/inputs_aix.go +++ b/x-pack/filebeat/input/default-inputs/inputs_aix.go @@ -11,6 +11,7 @@ import ( "github.com/elastic/beats/v7/x-pack/filebeat/input/awss3" "github.com/elastic/beats/v7/x-pack/filebeat/input/http_endpoint" "github.com/elastic/beats/v7/x-pack/filebeat/input/httpjson" + "github.com/elastic/beats/v7/x-pack/filebeat/input/lumberjack" "github.com/elastic/beats/v7/x-pack/filebeat/input/o365audit" "github.com/elastic/elastic-agent-libs/logp" ) @@ -21,5 +22,6 @@ func xpackInputs(info beat.Info, log *logp.Logger, store beater.StateStore) []v2 httpjson.Plugin(log, store), o365audit.Plugin(log, store), awss3.Plugin(store), + lumberjack.Plugin(), } } diff --git a/x-pack/filebeat/input/default-inputs/inputs_other.go b/x-pack/filebeat/input/default-inputs/inputs_other.go index fa731041fa7..83b1bf8e66c 100644 --- a/x-pack/filebeat/input/default-inputs/inputs_other.go +++ b/x-pack/filebeat/input/default-inputs/inputs_other.go @@ -16,6 +16,7 @@ import ( "github.com/elastic/beats/v7/x-pack/filebeat/input/cloudfoundry" "github.com/elastic/beats/v7/x-pack/filebeat/input/http_endpoint" "github.com/elastic/beats/v7/x-pack/filebeat/input/httpjson" + "github.com/elastic/beats/v7/x-pack/filebeat/input/lumberjack" "github.com/elastic/beats/v7/x-pack/filebeat/input/o365audit" "github.com/elastic/elastic-agent-libs/logp" ) @@ -28,5 +29,6 @@ func xpackInputs(info beat.Info, log *logp.Logger, store beater.StateStore) []v2 o365audit.Plugin(log, store), awss3.Plugin(store), awscloudwatch.Plugin(store), + lumberjack.Plugin(), } } diff --git a/x-pack/filebeat/input/lumberjack/_meta/fields.yml b/x-pack/filebeat/input/lumberjack/_meta/fields.yml new file mode 100644 index 00000000000..ee3ef012006 --- /dev/null +++ b/x-pack/filebeat/input/lumberjack/_meta/fields.yml @@ -0,0 +1,9 @@ +- key: lumberjack + title: "Lumberjack" + description: > + Fields from Lumberjack input. + fields: + - name: lumberjack + type: flattened + description: > + Structured data received in an event sent over the Lumberjack protocol. diff --git a/x-pack/filebeat/input/lumberjack/ack.go b/x-pack/filebeat/input/lumberjack/ack.go new file mode 100644 index 00000000000..ab15ad157dc --- /dev/null +++ b/x-pack/filebeat/input/lumberjack/ack.go @@ -0,0 +1,78 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package lumberjack + +import ( + "sync" + + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/common/acker" +) + +// batchACKTracker invokes batchACK when all events associated to the batch +// have been published and acknowledged by an output. +type batchACKTracker struct { + batchACK func() + + mutex sync.Mutex // mutex synchronizes access to pendingACKs. + pendingACKs int64 // Number of Beat events in lumberjack batch that are pending ACKs. +} + +// newBatchACKTracker returns a new batchACKTracker. The provided batchACK function +// is invoked after the full batch has been acknowledged. Ready() must be invoked +// after all events in the batch are published. +func newBatchACKTracker(batchACKCallback func()) *batchACKTracker { + return &batchACKTracker{ + batchACK: batchACKCallback, + pendingACKs: 1, // Ready() must be called to consume this "1". + } +} + +// Ready signals that the batch has been fully consumed. Only +// after the batch is marked as "ready" can the lumberjack batch +// be ACKed. This prevents the batch from being ACKed prematurely. +func (t *batchACKTracker) Ready() { + t.ACK() +} + +// Add increments the number of pending ACKs. +func (t *batchACKTracker) Add() { + t.mutex.Lock() + defer t.mutex.Unlock() + + t.pendingACKs++ +} + +// ACK decrements the number of pending event ACKs. When all pending ACKs are +// received then the lumberjack batch is ACKed. +func (t *batchACKTracker) ACK() { + t.mutex.Lock() + defer t.mutex.Unlock() + + if t.pendingACKs <= 0 { + panic("misuse detected: negative ACK counter") + } + + t.pendingACKs-- + if t.pendingACKs == 0 { + t.batchACK() + } +} + +// newEventACKHandler returns a beat ACKer that can receive callbacks when +// an event has been ACKed an output. If the event contains a private metadata +// pointing to a batchACKTracker then it will invoke the tracker's ACK() method +// to decrement the number of pending ACKs. +func newEventACKHandler() beat.ACKer { + return acker.ConnectionOnly( + acker.EventPrivateReporter(func(_ int, privates []interface{}) { + for _, private := range privates { + if ack, ok := private.(*batchACKTracker); ok { + ack.ACK() + } + } + }), + ) +} diff --git a/x-pack/filebeat/input/lumberjack/ack_test.go b/x-pack/filebeat/input/lumberjack/ack_test.go new file mode 100644 index 00000000000..90e03819488 --- /dev/null +++ b/x-pack/filebeat/input/lumberjack/ack_test.go @@ -0,0 +1,46 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package lumberjack + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/elastic/go-lumber/lj" +) + +func TestBatchACKTracker(t *testing.T) { + t.Run("empty", func(t *testing.T) { + batch := lj.NewBatch(nil) + + acker := newBatchACKTracker(batch.ACK) + require.False(t, isACKed(batch)) + + acker.Ready() + require.True(t, isACKed(batch)) + }) + + t.Run("single_event", func(t *testing.T) { + batch := lj.NewBatch(nil) + + acker := newBatchACKTracker(batch.ACK) + acker.Add() + acker.ACK() + require.False(t, isACKed(batch)) + + acker.Ready() + require.True(t, isACKed(batch)) + }) +} + +func isACKed(batch *lj.Batch) bool { + select { + case <-batch.Await(): + return true + default: + return false + } +} diff --git a/x-pack/filebeat/input/lumberjack/config.go b/x-pack/filebeat/input/lumberjack/config.go new file mode 100644 index 00000000000..53ceed2f8ce --- /dev/null +++ b/x-pack/filebeat/input/lumberjack/config.go @@ -0,0 +1,39 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package lumberjack + +import ( + "fmt" + "strings" + "time" + + "github.com/elastic/elastic-agent-libs/transport/tlscommon" +) + +type config struct { + ListenAddress string `config:"listen_address" validate:"nonzero"` // Bind address for the server (e.g. address:port). Default to localhost:5044. + Versions []string `config:"versions"` // List of Lumberjack version (e.g. v1, v2). + TLS *tlscommon.ServerConfig `config:"ssl"` // TLS options. + Keepalive time.Duration `config:"keepalive" validate:"min=0"` // Keepalive interval for notifying clients that batches that are not yet ACKed. + Timeout time.Duration `config:"timeout" validate:"min=0"` // Read / write timeouts for Lumberjack server. + MaxConnections int `config:"max_connections" validate:"min=0"` // Maximum number of concurrent connections. Default is 0 which means no limit. +} + +func (c *config) InitDefaults() { + c.ListenAddress = "localhost:5044" + c.Versions = []string{"v1", "v2"} +} + +func (c *config) Validate() error { + for _, v := range c.Versions { + switch strings.ToLower(v) { + case "v1", "v2": + default: + return fmt.Errorf("invalid lumberjack version %q: allowed values are v1 and v2", v) + } + } + + return nil +} diff --git a/x-pack/filebeat/input/lumberjack/config_test.go b/x-pack/filebeat/input/lumberjack/config_test.go new file mode 100644 index 00000000000..5b9e73d4d7c --- /dev/null +++ b/x-pack/filebeat/input/lumberjack/config_test.go @@ -0,0 +1,74 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package lumberjack + +import ( + "testing" + + "github.com/stretchr/testify/require" + + conf "github.com/elastic/elastic-agent-libs/config" +) + +func TestConfig(t *testing.T) { + testCases := []struct { + name string + userConfig map[string]interface{} + expected *config + expectedErr string + }{ + { + "defaults", + map[string]interface{}{}, + &config{ + ListenAddress: "localhost:5044", + Versions: []string{"v1", "v2"}, + }, + "", + }, + { + "validate version", + map[string]interface{}{ + "versions": []string{"v3"}, + }, + nil, + `invalid lumberjack version "v3"`, + }, + { + "validate keepalive", + map[string]interface{}{ + "keepalive": "-1s", + }, + nil, + `requires duration >= 0`, + }, + { + "validate max_connections", + map[string]interface{}{ + "max_connections": -1, + }, + nil, + `requires value >= 0 accessing 'max_connections'`, + }, + } + + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + c := conf.MustNewConfigFrom(tc.userConfig) + + var ljConf config + err := c.Unpack(&ljConf) + + if tc.expectedErr != "" { + require.Error(t, err, "expected error: %s", tc.expectedErr) + require.Contains(t, err.Error(), tc.expectedErr) + return + } + + require.Equal(t, *tc.expected, ljConf) + }) + } +} diff --git a/x-pack/filebeat/input/lumberjack/fields.go b/x-pack/filebeat/input/lumberjack/fields.go new file mode 100644 index 00000000000..d54be5d16eb --- /dev/null +++ b/x-pack/filebeat/input/lumberjack/fields.go @@ -0,0 +1,23 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +// Code generated by beats/dev-tools/cmd/asset/asset.go - DO NOT EDIT. + +package lumberjack + +import ( + "github.com/elastic/beats/v7/libbeat/asset" +) + +func init() { + if err := asset.SetFields("filebeat", "lumberjack", asset.ModuleFieldsPri, AssetLumberjack); err != nil { + panic(err) + } +} + +// AssetLumberjack returns asset data. +// This is the base64 encoded zlib format compressed contents of input/lumberjack. +func AssetLumberjack() string { + return "eJxsjjEOwjAQBHu/YpU+eYALSio6XmDsjTBxbOtyjpTfo0QIEGKLK/ZGmu0xcbNIbb5RHs5PBtCoiRbd5V12BghcvMSqsWSLkwGAc2QKC0YpMz4wYq5NBwOMx98ebI/sZv6Y9uhWaTEmp8rM8Gr/2PZcVZrXJgwITh2EnnFlQMxwGVyZFct+ykqB3vk9rErR4ksazDMAAP//JmxQDQ==" +} diff --git a/x-pack/filebeat/input/lumberjack/generate_certs_test.go b/x-pack/filebeat/input/lumberjack/generate_certs_test.go new file mode 100644 index 00000000000..e66eb1b5b8b --- /dev/null +++ b/x-pack/filebeat/input/lumberjack/generate_certs_test.go @@ -0,0 +1,153 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package lumberjack + +import ( + "bytes" + "crypto/rand" + "crypto/rsa" + "crypto/tls" + "crypto/x509" + "crypto/x509/pkix" + "encoding/pem" + "math/big" + "net" + "testing" + "time" +) + +type Cert struct { + signedCertDER []byte // DER encoded certificate from x509.CreateCertificate. + key *rsa.PrivateKey // RSA public / private key pair. +} + +// CertPEM returns the cert encoded as PEM. +func (c Cert) CertPEM(t testing.TB) []byte { return pemEncode(t, c.signedCertDER, "CERTIFICATE") } + +// KeyPEM returns the private key encoded as PEM. +func (c Cert) KeyPEM(t testing.TB) []byte { + return pemEncode(t, x509.MarshalPKCS1PrivateKey(c.key), "RSA PRIVATE KEY") +} + +func (c Cert) TLSCertificate(t testing.TB) tls.Certificate { + pair, err := tls.X509KeyPair(c.CertPEM(t), c.KeyPEM(t)) + if err != nil { + t.Fatal(err) + } + + return pair +} + +// generateCertData creates a root CA, server, and client cert suitable for +// testing mTLS. +func generateCertData(t testing.TB) (rootCA, client, server Cert) { + t.Helper() + + // CA cert + ca := &x509.Certificate{ + SerialNumber: big.NewInt(1), + Subject: pkix.Name{ + Organization: []string{"Elastic"}, + Country: []string{"US"}, + Locality: []string{"San Francisco"}, + StreetAddress: []string{"West El Camino Real"}, + PostalCode: []string{"94040"}, + }, + NotBefore: time.Now(), + NotAfter: time.Now().AddDate(0, 0, 1), + IsCA: true, + ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth, x509.ExtKeyUsageServerAuth}, + KeyUsage: x509.KeyUsageDigitalSignature | x509.KeyUsageCertSign, + BasicConstraintsValid: true, + } + + var err error + rootCA.key, err = rsa.GenerateKey(rand.Reader, 4096) + if err != nil { + t.Fatal(err) + } + + rootCA.signedCertDER, err = x509.CreateCertificate(rand.Reader, ca, ca, &rootCA.key.PublicKey, rootCA.key) + if err != nil { + t.Fatal(err) + } + + // Server cert + { + // set up our server certificate + serverCert := &x509.Certificate{ + SerialNumber: big.NewInt(2), + Subject: pkix.Name{ + Organization: []string{"Elastic"}, + Country: []string{"US"}, + Locality: []string{"San Francisco"}, + StreetAddress: []string{"West El Camino Real"}, + PostalCode: []string{"94040"}, + CommonName: "server", + }, + IPAddresses: []net.IP{net.IPv4(127, 0, 0, 1), net.IPv6loopback}, + DNSNames: []string{"localhost"}, + NotBefore: time.Now(), + NotAfter: time.Now().AddDate(0, 0, 1), + SubjectKeyId: []byte{1, 2, 3, 4, 5}, + ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth, x509.ExtKeyUsageServerAuth}, + KeyUsage: x509.KeyUsageDigitalSignature, + } + + server.key, err = rsa.GenerateKey(rand.Reader, 4096) + if err != nil { + t.Fatal(err) + } + + server.signedCertDER, err = x509.CreateCertificate(rand.Reader, serverCert, ca, &server.key.PublicKey, rootCA.key) + if err != nil { + t.Fatal(err) + } + } + + // Client cert. + { + clientCert := &x509.Certificate{ + SerialNumber: big.NewInt(3), + Subject: pkix.Name{ + Organization: []string{"Elastic"}, + Country: []string{"US"}, + Locality: []string{"San Francisco"}, + StreetAddress: []string{"West El Camino Real"}, + PostalCode: []string{"94040"}, + CommonName: "client", + }, + NotBefore: time.Now(), + NotAfter: time.Now().AddDate(0, 0, 1), + SubjectKeyId: []byte{1, 2, 3, 4, 5}, + ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth, x509.ExtKeyUsageServerAuth}, + KeyUsage: x509.KeyUsageDigitalSignature, + EmailAddresses: []string{"client@example.com"}, + } + + client.key, err = rsa.GenerateKey(rand.Reader, 4096) + if err != nil { + t.Fatal(err) + } + + client.signedCertDER, err = x509.CreateCertificate(rand.Reader, clientCert, ca, &client.key.PublicKey, rootCA.key) + if err != nil { + t.Fatal(err) + } + } + + return rootCA, client, server +} + +func pemEncode(t testing.TB, certBytes []byte, certType string) []byte { + t.Helper() + + pemData := new(bytes.Buffer) + if err := pem.Encode(pemData, &pem.Block{Type: certType, Bytes: certBytes}); err != nil { + t.Fatal(err) + } + + return pemData.Bytes() +} diff --git a/x-pack/filebeat/input/lumberjack/input.go b/x-pack/filebeat/input/lumberjack/input.go new file mode 100644 index 00000000000..b95d0946189 --- /dev/null +++ b/x-pack/filebeat/input/lumberjack/input.go @@ -0,0 +1,92 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package lumberjack + +import ( + "fmt" + + inputv2 "github.com/elastic/beats/v7/filebeat/input/v2" + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/feature" + conf "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/monitoring" +) + +const ( + inputName = "lumberjack" +) + +func Plugin() inputv2.Plugin { + return inputv2.Plugin{ + Name: inputName, + Stability: feature.Beta, + Info: "Receives data streamed via the Lumberjack protocol.", + Manager: inputv2.ConfigureWith(configure), + } +} + +func configure(cfg *conf.C) (inputv2.Input, error) { + var lumberjackConfig config + if err := cfg.Unpack(&lumberjackConfig); err != nil { + return nil, err + } + + return newLumberjackInput(lumberjackConfig) +} + +// lumberjackInput implements the Filebeat input V2 interface. The input is stateless. +type lumberjackInput struct { + config config +} + +var _ inputv2.Input = (*lumberjackInput)(nil) + +func newLumberjackInput(lumberjackConfig config) (*lumberjackInput, error) { + return &lumberjackInput{config: lumberjackConfig}, nil +} + +func (i *lumberjackInput) Name() string { return inputName } + +func (i *lumberjackInput) Test(inputCtx inputv2.TestContext) error { + s, err := newServer(i.config, inputCtx.Logger, nil, nil) + if err != nil { + return err + } + return s.Close() +} + +func (i *lumberjackInput) Run(inputCtx inputv2.Context, pipeline beat.Pipeline) error { + inputCtx.Logger.Info("Starting " + inputName + " input") + defer inputCtx.Logger.Info(inputName + " input stopped") + + // Create client for publishing events and receive notification of their ACKs. + client, err := pipeline.ConnectWith(beat.ClientConfig{ + CloseRef: inputCtx.Cancelation, + ACKHandler: newEventACKHandler(), + }) + if err != nil { + return fmt.Errorf("failed to create pipeline client: %w", err) + } + defer client.Close() + + metricRegistry := monitoring.GetNamespace("dataset").GetRegistry() + metrics := newInputMetrics(metricRegistry, inputCtx.ID) + defer metrics.Close() + + s, err := newServer(i.config, inputCtx.Logger, client.Publish, metrics) + if err != nil { + return err + } + defer s.Close() + + // Shutdown the server when cancellation is signaled. + go func() { + <-inputCtx.Cancelation.Done() + s.Close() + }() + + // Run server until the cancellation signal. + return s.Run() +} diff --git a/x-pack/filebeat/input/lumberjack/metrics.go b/x-pack/filebeat/input/lumberjack/metrics.go new file mode 100644 index 00000000000..ebceeb397b7 --- /dev/null +++ b/x-pack/filebeat/input/lumberjack/metrics.go @@ -0,0 +1,47 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package lumberjack + +import ( + "github.com/rcrowley/go-metrics" + + "github.com/elastic/elastic-agent-libs/monitoring" + "github.com/elastic/elastic-agent-libs/monitoring/adapter" +) + +type inputMetrics struct { + id string // Input ID. + parent *monitoring.Registry // Parent registry holding this input's ID as a key. + bindAddress *monitoring.String // Bind address of input. + + batchesReceivedTotal *monitoring.Uint // Number of Lumberjack batches received (not necessarily processed fully). + batchesACKedTotal *monitoring.Uint // Number of Lumberjack batches ACKed. + messagesReceivedTotal *monitoring.Uint // Number of Lumberjack messages received (not necessarily processed fully). + batchProcessingTime metrics.Sample // Histogram of the elapsed batch processing times in nanoseconds (time of receipt to time of ACK for non-empty batches). +} + +// Close removes the metrics from the registry. +func (m *inputMetrics) Close() { + m.parent.Remove(m.id) +} + +func newInputMetrics(parent *monitoring.Registry, id string) *inputMetrics { + reg := parent.NewRegistry(id) + monitoring.NewString(reg, "input").Set(inputName) + monitoring.NewString(reg, "id").Set(id) + out := &inputMetrics{ + id: id, + parent: reg, + bindAddress: monitoring.NewString(reg, "bind_address"), + batchesReceivedTotal: monitoring.NewUint(reg, "batches_received_total"), + batchesACKedTotal: monitoring.NewUint(reg, "batches_acked_total"), + messagesReceivedTotal: monitoring.NewUint(reg, "messages_received_total"), + batchProcessingTime: metrics.NewUniformSample(1024), + } + adapter.NewGoMetrics(reg, "batch_processing_time", adapter.Accept). + Register("histogram", metrics.NewHistogram(out.batchProcessingTime)) //nolint:errcheck // A unique namespace is used so name collisions are impossible. + + return out +} diff --git a/x-pack/filebeat/input/lumberjack/server.go b/x-pack/filebeat/input/lumberjack/server.go new file mode 100644 index 00000000000..e6f7ddcd626 --- /dev/null +++ b/x-pack/filebeat/input/lumberjack/server.go @@ -0,0 +1,172 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package lumberjack + +import ( + "crypto/tls" + "net" + "strings" + "sync" + "time" + + "golang.org/x/net/netutil" + + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/monitoring" + "github.com/elastic/elastic-agent-libs/transport/tlscommon" + lumber "github.com/elastic/go-lumber/server" +) + +type server struct { + config config + log *logp.Logger + publish func(beat.Event) + metrics *inputMetrics + ljSvr lumber.Server + ljSvrCloseOnce sync.Once + bindAddress string +} + +func newServer(c config, log *logp.Logger, pub func(beat.Event), metrics *inputMetrics) (*server, error) { + ljSvr, bindAddress, err := newLumberjack(c) + if err != nil { + return nil, err + } + + if metrics == nil { + metrics = newInputMetrics(monitoring.NewRegistry(), "") + } + + bindURI := "tcp://" + bindAddress + if c.TLS.IsEnabled() { + bindURI = "tls://" + bindAddress + } + log.Infof(inputName+" is listening at %v.", bindURI) + metrics.bindAddress.Set(bindURI) + + return &server{ + config: c, + log: log.With("listen_address", bindAddress), + publish: pub, + metrics: metrics, + ljSvr: ljSvr, + bindAddress: bindAddress, + }, nil +} + +func (s *server) Close() error { + var err error + s.ljSvrCloseOnce.Do(func() { + err = s.ljSvr.Close() + }) + return err +} + +func (s *server) Run() error { + return s.processBatches() +} + +func (s *server) processBatches() error { + for batch := range s.ljSvr.ReceiveChan() { + s.metrics.batchesReceivedTotal.Inc() + + if len(batch.Events) == 0 { + batch.ACK() + s.metrics.batchesACKedTotal.Inc() + continue + } + s.metrics.messagesReceivedTotal.Add(uint64(len(batch.Events))) + + // Track all the Beat events associated to the Lumberjack batch so that + // the batch can be ACKed after the Beat events are delivered successfully. + start := time.Now() + acker := newBatchACKTracker(func() { + batch.ACK() + s.metrics.batchesACKedTotal.Inc() + s.metrics.batchProcessingTime.Update(time.Since(start).Nanoseconds()) + }) + + for _, ljEvent := range batch.Events { + acker.Add() + s.publish(makeEvent(ljEvent, acker)) + } + + // Mark the batch as "ready" after Beat events are generated for each + // Lumberjack event. + acker.Ready() + } + + return nil +} + +func makeEvent(lumberjackEvent interface{}, acker *batchACKTracker) beat.Event { + return beat.Event{ + Timestamp: time.Now().UTC(), + Fields: map[string]interface{}{ + "lumberjack": lumberjackEvent, + }, + Private: acker, + } +} + +func newLumberjack(c config) (lj lumber.Server, bindAddress string, err error) { + // Setup optional TLS. + var tlsConfig *tls.Config + if c.TLS.IsEnabled() { + elasticTLSConfig, err := tlscommon.LoadTLSServerConfig(c.TLS) + if err != nil { + return nil, "", err + } + + // NOTE: Passing an empty string disables checking the client certificate for a + // specific hostname. + tlsConfig = elasticTLSConfig.BuildServerConfig("") + } + + // Start listener. + l, err := net.Listen("tcp", c.ListenAddress) + if err != nil { + return nil, "", err + } + if tlsConfig != nil { + l = tls.NewListener(l, tlsConfig) + } + if c.MaxConnections > 0 { + l = netutil.LimitListener(l, c.MaxConnections) + } + + // Start lumberjack server. + s, err := lumber.NewWithListener(l, makeLumberjackOptions(c)...) + if err != nil { + return nil, "", err + } + + return s, l.Addr().String(), nil +} + +func makeLumberjackOptions(c config) []lumber.Option { + var opts []lumber.Option + + // Versions + for _, p := range c.Versions { + switch strings.ToLower(p) { + case "v1": + opts = append(opts, lumber.V1(true)) + case "v2": + opts = append(opts, lumber.V2(true)) + } + } + + if c.Keepalive > 0 { + opts = append(opts, lumber.Keepalive(c.Keepalive)) + } + + if c.Timeout > 0 { + opts = append(opts, lumber.Timeout(c.Keepalive)) + } + + return opts +} diff --git a/x-pack/filebeat/input/lumberjack/server_test.go b/x-pack/filebeat/input/lumberjack/server_test.go new file mode 100644 index 00000000000..69c53941c06 --- /dev/null +++ b/x-pack/filebeat/input/lumberjack/server_test.go @@ -0,0 +1,238 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package lumberjack + +import ( + "context" + "crypto/tls" + "crypto/x509" + "errors" + "fmt" + "net" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" + + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/transport/tlscommon" + client "github.com/elastic/go-lumber/client/v2" +) + +const testTimeout = 10 * time.Second + +func TestServer(t *testing.T) { + makeTestConfig := func() config { + var c config + c.InitDefaults() + c.ListenAddress = "localhost:0" + c.MaxConnections = 1 + c.Keepalive = time.Second + c.Timeout = time.Second + return c + } + + t.Run("empty_batch", func(t *testing.T) { + testSendReceive(t, makeTestConfig(), 0, nil) + }) + + t.Run("no tls", func(t *testing.T) { + testSendReceive(t, makeTestConfig(), 10, nil) + }) + + t.Run("tls", func(t *testing.T) { + clientConf, serverConf := tlsSetup(t) + clientConf.Certificates = nil + + c := makeTestConfig() + c.TLS = serverConf + c.TLS.ClientAuth = 0 // tls.NoClientCert + c.TLS.VerificationMode = tlscommon.VerifyNone + + testSendReceive(t, c, 10, clientConf) + }) + + t.Run("mutual tls", func(t *testing.T) { + clientConf, serverConf := tlsSetup(t) + + c := makeTestConfig() + c.TLS = serverConf + + testSendReceive(t, c, 10, clientConf) + }) +} + +func testSendReceive(t testing.TB, c config, numberOfEvents int, clientTLSConfig *tls.Config) { + require.NoError(t, logp.TestingSetup()) + log := logp.NewLogger(inputName).With("test_name", t.Name()) + + ctx, shutdown := context.WithTimeout(context.Background(), testTimeout) + t.Cleanup(shutdown) + collect := newEventCollector(ctx, numberOfEvents) + + // Start server. + s, err := newServer(c, log, collect.Publish, nil) + require.NoError(t, err) + go func() { + <-ctx.Done() + s.Close() + }() + + // Asynchronously send and receive events. + var wg errgroup.Group + wg.Go(s.Run) + wg.Go(func() error { + // The client returns on error or after an E2E ACK is received. + // In both cases the test should shutdown. + defer shutdown() + + return sendData(ctx, t, s.bindAddress, numberOfEvents, clientTLSConfig) + }) + + // Wait for the expected number of events. + collect.Await(t) + + // Check for errors from client and server. + require.NoError(t, wg.Wait()) +} + +func sendData(ctx context.Context, t testing.TB, bindAddress string, numberOfEvents int, clientTLSConfig *tls.Config) error { + _, port, err := net.SplitHostPort(bindAddress) + if err != nil { + return err + } + + dialFunc := net.Dial + if clientTLSConfig != nil { + dialer := &tls.Dialer{ + Config: clientTLSConfig, + } + dialFunc = dialer.Dial + } + + c, err := client.SyncDialWith(dialFunc, net.JoinHostPort("localhost", port)) + if err != nil { + return fmt.Errorf("client dial error: %w", err) + } + defer c.Close() + go func() { + <-ctx.Done() + time.Sleep(time.Second) + c.Close() + }() + t.Log("Lumberjack client connected.") + + var events []interface{} + for i := 0; i < numberOfEvents; i++ { + events = append(events, map[string]interface{}{ + "message": "hello world!", + "index": i, + }) + } + + if _, err = c.Send(events); err != nil { + return fmt.Errorf("failed sending lumberjack events: %w", err) + } + t.Log("Lumberjack client sent", len(events), "events.") + + return nil +} + +type eventCollector struct { + sync.Mutex + events []beat.Event + awaitCtx context.Context // awaitCtx is cancelled when events length is expectedSize. + awaitCancel context.CancelFunc + expectedSize int +} + +func newEventCollector(ctx context.Context, expectedSize int) *eventCollector { + ctx, cancel := context.WithCancel(ctx) + if expectedSize == 0 { + cancel() + } + + return &eventCollector{ + awaitCtx: ctx, + awaitCancel: cancel, + expectedSize: expectedSize, + } +} + +func (c *eventCollector) Publish(evt beat.Event) { + c.Lock() + defer c.Unlock() + + c.events = append(c.events, evt) + evt.Private.(*batchACKTracker).ACK() + + if len(c.events) == c.expectedSize { + c.awaitCancel() + } +} + +func (c *eventCollector) Await(t testing.TB) []beat.Event { + t.Helper() + + <-c.awaitCtx.Done() + if errors.Is(c.awaitCtx.Err(), context.DeadlineExceeded) { + t.Fatal(c.awaitCtx.Err()) + } + + c.Lock() + defer c.Unlock() + + if len(c.events) > c.expectedSize { + t.Fatalf("more events received than expected, got %d, want %d", len(c.events), c.expectedSize) + } + + events := make([]beat.Event, len(c.events)) + copy(events, c.events) + return events +} + +var ( + certDataOnce sync.Once + certData = struct { + ca, client, server Cert + }{} +) + +// tlsSetup return client and server configurations ready to test mutual TLS. +func tlsSetup(t *testing.T) (clientConfig *tls.Config, serverConfig *tlscommon.ServerConfig) { + t.Helper() + + certDataOnce.Do(func() { + certData.ca, certData.client, certData.server = generateCertData(t) + }) + + certPool := x509.NewCertPool() + certPool.AppendCertsFromPEM(certData.ca.CertPEM(t)) + + clientConfig = &tls.Config{ + RootCAs: certPool, + Certificates: []tls.Certificate{certData.client.TLSCertificate(t)}, + MinVersion: tls.VersionTLS12, + } + + serverConfig = &tlscommon.ServerConfig{ + // NOTE: VerifyCertificate is ineffective unless ClientAuth is set to RequireAndVerifyClientCert. + VerificationMode: tlscommon.VerifyCertificate, + // Unfortunately ServerConfig uses an unexported type in an exported field. + ClientAuth: 4, // tls.RequireAndVerifyClientCert + CAs: []string{ + string(certData.ca.CertPEM(t)), + }, + Certificate: tlscommon.CertificateConfig{ + Certificate: string(certData.server.CertPEM(t)), + Key: string(certData.server.KeyPEM(t)), + }, + } + + return clientConfig, serverConfig +}