Skip to content
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 6 additions & 8 deletions x-pack/dockerlogbeat/docs/install.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,9 @@ example:
["source","sh",subs="attributes"]
----
docker run --log-driver=elastic/{log-driver-alias}:{version} \
--log-opt output.elasticsearch.hosts="https://myhost:9200" \
--log-opt output.elasticsearch.username="myusername" \
--log-opt output.elasticsearch.password="mypassword" \
--log-opt output.elasticsearch.index="elastic-log-driver-%{+yyyy.MM.dd}" \
--log-opt endpoint="https://myhost:9200" \
--log-opt user="myusername" \
--log-opt password="mypassword" \
-it debian:jessie /bin/bash
----
// end::log-driver-run[]
Expand All @@ -100,10 +99,9 @@ example:
{
"log-driver" : "elastic/{log-driver-alias}:{version}",
"log-opts" : {
"output.elasticsearch.hosts" : "https://myhost:9200",
"output.elasticsearch.username" : "myusername",
"output.elasticsearch.password" : "mypassword",
"output.elasticsearch.index" : "elastic-log-driver-%{+yyyy.MM.dd}"
"endpoint" : "https://myhost:9200",
"user" : "myusername",
"password" : "mypassword"
}
}
----
Expand Down
1 change: 0 additions & 1 deletion x-pack/dockerlogbeat/docs/limitations.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,5 @@ This release of the {log-driver} has the following known problems and
limitations:

* Spool to disk (beta) is not supported.
* Complex config options can't be easily represented via `--log-opts`.
* Mapping templates and other assets that are normally installed by the
{beats} setup are not available.
37 changes: 19 additions & 18 deletions x-pack/dockerlogbeat/docs/usage.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@ The following examples show common configurations for the {log-driver}.
["source","sh",subs="attributes"]
----
docker run --log-driver=elastic/{log-driver-alias}:{version} \
--log-opt output.elasticsearch.hosts="myhost:9200" \
--log-opt output.elasticsearch.protocol="https" \
--log-opt output.elasticsearch.username="myusername" \
--log-opt output.elasticsearch.password="mypassword" \
--log-opt output.elasticsearch.index="elastic-log-driver-%{+yyyy.MM.dd}" \
--log-opt endpoint="myhost:9200" \
--log-opt user="myusername" \
--log-opt password="mypassword" \
-it debian:jessie /bin/bash
----

Expand All @@ -32,11 +30,9 @@ docker run --log-driver=elastic/{log-driver-alias}:{version} \
{
"log-driver" : "elastic/{log-driver-alias}:{version}",
"log-opts" : {
"output.elasticsearch.hosts" : "myhost:9200",
"output.elasticsearch.protocol" : "https",
"output.elasticsearch.username" : "myusername",
"output.elasticsearch.password" : "mypassword",
"output.elasticsearch.index" : "elastic-log-driver-%{+yyyy.MM.dd}"
"endpoint" : "myhost:9200",
"user" : "myusername",
"password" : "mypassword",
}
}
----
Expand All @@ -49,9 +45,8 @@ docker run --log-driver=elastic/{log-driver-alias}:{version} \
["source","sh",subs="attributes"]
----
docker run --log-driver=elastic/{log-driver-alias}:{version} \
--log-opt cloud.id="MyElasticStack:daMbY2VudHJhbDekZ2NwLmN4b3VkLmVzLmliJDVkYmQwtGJiYjs0NTRiN4Q5ODJmNGUwm1IxZmFkNjM5JDFiNjdkMDE4MTgxMTQzNTM5ZGFiYWJjZmY0OWIyYWE5" \
--log-opt cloud.auth="myusername:mypassword" \
--log-opt output.elasticsearch.index="elastic-log-driver-%{+yyyy.MM.dd}" \
--log-opt cloud_id="MyElasticStack:daMbY2VudHJhbDekZ2NwLmN4b3VkLmVzLmliJDVkYmQwtGJiYjs0NTRiN4Q5ODJmNGUwm1IxZmFkNjM5JDFiNjdkMDE4MTgxMTQzNTM5ZGFiYWJjZmY0OWIyYWE5" \
--log-opt cloud_auth="myusername:mypassword" \
-it debian:jessie /bin/bash
----

Expand All @@ -62,22 +57,25 @@ docker run --log-driver=elastic/{log-driver-alias}:{version} \
{
"log-driver" : "elastic/{log-driver-alias}:{version}",
"log-opts" : {
"cloud.id" : "MyElasticStack:daMbY2VudHJhbDekZ2NwLmN4b3VkLmVzLmliJDVkYmQwtGJiYjs0NTRiN4Q5ODJmNGUwm1IxZmFkNjM5JDFiNjdkMDE4MTgxMTQzNTM5ZGFiYWJjZmY0OWIyYWE5",
"cloud.auth" : "myusername:mypassword",
"cloud_id" : "MyElasticStack:daMbY2VudHJhbDekZ2NwLmN4b3VkLmVzLmliJDVkYmQwtGJiYjs0NTRiN4Q5ODJmNGUwm1IxZmFkNjM5JDFiNjdkMDE4MTgxMTQzNTM5ZGFiYWJjZmY0OWIyYWE5",
"cloud_auth" : "myusername:mypassword",
"output.elasticsearch.index" : "elastic-log-driver-%{+yyyy.MM.dd}"
}
}
----

[float]
=== Send Docker logs to {ls}
=== Specify a custom index and template

*Docker run command:*

["source","sh",subs="attributes"]
----
docker run --log-driver=elastic/{log-driver-alias}:{version} \
--log-opt output.logstash.hosts="myhost:5044" \
--log-opt endpoint="myhost:9200" \
--log-opt user="myusername" \
--log-opt password="mypassword" \
--log-opt index="eld-%{[agent.version]}-%{+yyyy.MM.dd}" \
-it debian:jessie /bin/bash
----

Expand All @@ -88,7 +86,10 @@ docker run --log-driver=elastic/{log-driver-alias}:{version} \
{
"log-driver" : "elastic/{log-driver-alias}:{version}",
"log-opts" : {
"output.logstash.hosts" : "myhost:5044"
"endpoint" : "myhost:9200",
"user" : "myusername",
"index" : "eld-%{[agent.version]}-%{+yyyy.MM.dd}",
"password" : "mypassword",
}
}
----
11 changes: 7 additions & 4 deletions x-pack/dockerlogbeat/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ package main
import (
"encoding/json"
"net/http"
"os"
"path/filepath"

"github.com/docker/docker/daemon/logger"

Expand Down Expand Up @@ -38,12 +36,17 @@ func startLoggingHandler(pm *pipelinemanager.PipelineManager) func(w http.Respon
return
}

pm.Logger.Debugf("Homepath: %v\n", filepath.Dir(os.Args[0]))
pm.Logger.Infof("Got start request object from container %#v\n", startReq.Info.ContainerName)
pm.Logger.Debugf("Got a container with the following labels: %#v\n", startReq.Info.ContainerLabels)
pm.Logger.Debugf("Got a container with the following log opts: %#v\n", startReq.Info.Config)

cl, err := pm.CreateClientWithConfig(startReq.Info, startReq.File)
cfg, err := pipelinemanager.NewCfgFromRaw(startReq.Info.Config)
if err != nil {
http.Error(w, errors.Wrap(err, "error creating client config").Error(), http.StatusBadRequest)
return
}
pm.Logger.Debugf("Got config: %#v", cfg)
cl, err := pm.CreateClientWithConfig(cfg, startReq.Info, startReq.File)
if err != nil {
http.Error(w, errors.Wrap(err, "error creating client").Error(), http.StatusBadRequest)
return
Expand Down
8 changes: 4 additions & 4 deletions x-pack/dockerlogbeat/pipelinemanager/clientLogReader.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ import (
type ClientLogger struct {
logFile *pipereader.PipeReader
client beat.Client
pipelineHash string
pipelineHash uint64
closer chan struct{}
containerMeta logger.Info
logger *logp.Logger
}

// newClientFromPipeline creates a new Client logger with a FIFO reader and beat client
func newClientFromPipeline(pipeline beat.PipelineConnector, inputFile *pipereader.PipeReader, hashstring string, info logger.Info) (*ClientLogger, error) {
func newClientFromPipeline(pipeline beat.PipelineConnector, inputFile *pipereader.PipeReader, hash uint64, info logger.Info) (*ClientLogger, error) {
// setup the beat client
settings := beat.ClientConfig{
WaitClose: 0,
Expand All @@ -47,9 +47,9 @@ func newClientFromPipeline(pipeline beat.PipelineConnector, inputFile *pipereade
return nil, err
}

clientLogger.Debugf("Created new logger for %s", hashstring)
clientLogger.Debugf("Created new logger for %d", hash)

return &ClientLogger{logFile: inputFile, client: client, pipelineHash: hashstring, closer: make(chan struct{}), containerMeta: info, logger: clientLogger}, nil
return &ClientLogger{logFile: inputFile, client: client, pipelineHash: hash, closer: make(chan struct{}), containerMeta: info, logger: clientLogger}, nil
}

// Close closes the pipeline client and reader
Expand Down
19 changes: 18 additions & 1 deletion x-pack/dockerlogbeat/pipelinemanager/clientLogReader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,23 @@ import (
"github.com/elastic/beats/v7/x-pack/dockerlogbeat/pipereader"
)

func TestConfigHosts(t *testing.T) {
testHostEmpty := map[string]string{
"api_key": "keykey",
}
_, err := NewCfgFromRaw(testHostEmpty)
assert.Error(t, err)

testMultiHost := map[string]string{
"endpoint": "endpoint1,endpoint2",
}
goodOut := []string{"endpoint1", "endpoint2"}
cfg, err := NewCfgFromRaw(testMultiHost)
assert.NoError(t, err)
assert.Equal(t, goodOut, cfg.Endpoint)

}

func TestNewClient(t *testing.T) {
logString := "This is a log line"
cfgObject := logger.Info{
Expand Down Expand Up @@ -68,7 +85,7 @@ func createNewClient(t *testing.T, logString string, mockConnector *pipelinemock
reader, err := pipereader.NewReaderFromReadCloser(pipelinemock.CreateTestInputFromLine(t, logString))
require.NoError(t, err)

client, err := newClientFromPipeline(mockConnector, reader, "aaa", cfgObject)
client, err := newClientFromPipeline(mockConnector, reader, 123, cfgObject)
require.NoError(t, err)

return client
Expand Down
75 changes: 75 additions & 0 deletions x-pack/dockerlogbeat/pipelinemanager/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package pipelinemanager

import (
"strings"

"github.com/pkg/errors"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/transform/typeconv"
)

// ContainerOutputConfig has all the options we'll expect from --log-opts
type ContainerOutputConfig struct {
Endpoint []string `struct:"output.elasticsearch.hosts,omitempty"`
User string `struct:"output.elasticsearch.username,omitempty"`
Password string `struct:"output.elasticsearch.password,omitempty"`
Index string `struct:"output.elasticsearch.index,omitempty"`
Pipeline string `struct:"output.elasticsearch.pipeline,omitempty"`
APIKey string `struct:"output.elasticsearch.api_key,omitempty"`
Timeout string `struct:"output.elasticsearch.timeout,omitempty"`
BackoffInit string `struct:"output.elasticsearch.backoff.init,omitempty"`
BackoffMax string `struct:"output.elasticsearch.backoff.max,omitempty"`
CloudID string `struct:"cloud.id,omitempty"`
CloudAuth string `struct:"cloud.auth,omitempty"`
ProxyURL string `struct:"output.elasticsearch.proxy_url,omitempty"`
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note (for follow up PR?):
When publishing, the index and pipeline are set per event/document in the bulk request. We currently hash this structure to create a new pipeline, creating a new output with workers and in memory event buffer per index we want to publish to. Alternatively we can distinguish between 'input' and 'output' settings and only create a publishing pipeline per set of 'output' settings only (memory queue + output). The index and pipeline setting would become 'input' settings, allowing us to share resources between multiple containers.


// NewCfgFromRaw returns a ContainerOutputConfig based on a raw config we get from the API
func NewCfgFromRaw(input map[string]string) (ContainerOutputConfig, error) {

newCfg := ContainerOutputConfig{}
endpoint, ok := input["endpoint"]
if !ok {
return newCfg, errors.New("An endpoint flag is required")
}

endpointList := strings.Split(endpoint, ",")

newCfg.Endpoint = endpointList

newCfg.User = input["user"]
newCfg.Password = input["password"]
newCfg.Index, _ = input["index"]
newCfg.Pipeline = input["pipeline"]
newCfg.CloudID = input["cloud_id"]
newCfg.CloudAuth = input["cloud_auth"]
newCfg.ProxyURL = input["proxy_url"]
newCfg.APIKey = input["api_key"]
newCfg.Timeout = input["timeout"]
newCfg.BackoffInit = input["backoff_init"]
newCfg.BackoffMax = input["backoff_max"]

return newCfg, nil
}

// CreateConfig converts the struct into a config object that can be absorbed by libbeat
func (cfg ContainerOutputConfig) CreateConfig() (*common.Config, error) {

// the use of typeconv is a hacky shim so we can impliment `omitempty` where needed.
var tmp map[string]interface{}
err := typeconv.Convert(&tmp, cfg)
if err != nil {
return nil, errors.Wrap(err, "error converting config struct to interface")
}
cfgFinal, err := common.NewConfigFrom(tmp)
if err != nil {
return nil, errors.Wrap(err, "error creating config object")
}

return cfgFinal, nil
}
22 changes: 7 additions & 15 deletions x-pack/dockerlogbeat/pipelinemanager/libbeattools.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/elastic/beats/v7/libbeat/cloudid"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/file"
"github.com/elastic/beats/v7/libbeat/idxmgmt"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/outputs"
"github.com/elastic/beats/v7/libbeat/publisher/pipeline"
Expand Down Expand Up @@ -50,14 +49,9 @@ func makeConfigHash(cfg map[string]string) string {
}

// load pipeline starts up a new pipeline with the given config
func loadNewPipeline(logOptsConfig map[string]string, name string, log *logp.Logger) (*Pipeline, error) {
func loadNewPipeline(logOptsConfig ContainerOutputConfig, name string, log *logp.Logger) (*Pipeline, error) {

newCfg, err := parseCfgKeys(logOptsConfig)
if err != nil {
return nil, errors.Wrap(err, "error parsing config keys")
}

cfg, err := common.NewConfigFrom(newCfg)
cfg, err := logOptsConfig.CreateConfig()
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -90,10 +84,7 @@ func loadNewPipeline(logOptsConfig map[string]string, name string, log *logp.Log
return nil, errors.Wrap(err, "error unpacking pipeline config")
}

idx, err := idxmgmt.DefaultSupport(log, info, config.Output.Config())
if err != nil {
return nil, errors.Wrap(err, "error making index manager")
}
idxMgr := newIndexSupporter(info)

settings := pipeline.Settings{
WaitClose: time.Duration(time.Second * 10),
Expand All @@ -111,7 +102,7 @@ func loadNewPipeline(logOptsConfig map[string]string, name string, log *logp.Log
pipelineCfg,
func(stat outputs.Observer) (string, outputs.Group, error) {
cfg := config.Output
out, err := outputs.Load(idx, info, stat, cfg.Name(), cfg.Config())
out, err := outputs.Load(idxMgr, info, stat, cfg.Name(), cfg.Config())
return cfg.Name(), out, err
},
settings,
Expand Down Expand Up @@ -161,16 +152,17 @@ func getBeatInfo(cfg *common.Config) (beat.Info, error) {
}

if name.Name == "" {
name.Name = "elastic-log-driver-" + hostname
name.Name = "elastic-log-driver"
}
id, err := loadMeta("/tmp/meta.json")
if err != nil {
return beat.Info{}, errors.Wrap(err, "error loading UUID")
}

info := beat.Info{
Beat: "elastic-logging-plugin",
Beat: name.Name,
Name: name.Name,
IndexPrefix: name.Name,
Hostname: hostname,
Version: vers,
EphemeralID: eid,
Expand Down
Loading