From 84796204097e5991329a98bd55c041eefe113e44 Mon Sep 17 00:00:00 2001 From: simitt Date: Thu, 28 May 2020 16:32:21 +0200 Subject: [PATCH 01/10] Ensure ECS compliant logging when enabled. If `logging.ecs` is set log data in ECS compliant way. closes #3796 --- _meta/beat.yml | 4 +- apm-server.docker.yml | 4 +- apm-server.yml | 4 +- beater/middleware/log_middleware.go | 130 +++++++++++------- beater/middleware/log_middleware_test.go | 104 +++++++------- .../docs/loggingconfig.asciidoc | 3 +- 6 files changed, 142 insertions(+), 107 deletions(-) diff --git a/_meta/beat.yml b/_meta/beat.yml index 968cbf0a8bb..08fd2496b0b 100644 --- a/_meta/beat.yml +++ b/_meta/beat.yml @@ -1066,8 +1066,8 @@ output.elasticsearch: # Set to true to log messages in json format. #logging.json: false -# Set to true, to log messages with minimal required Elastic Common Schema (ECS) -# information. Recommended to use in combination with `logging.json=true` +# Set to true, to log messages in Elastic Common Schema (ECS) compliant format. +# Recommended to use in combination with `logging.json=true` # Defaults to false. #logging.ecs: false diff --git a/apm-server.docker.yml b/apm-server.docker.yml index 0058e9f036b..e087593a614 100644 --- a/apm-server.docker.yml +++ b/apm-server.docker.yml @@ -1066,8 +1066,8 @@ output.elasticsearch: # Set to true to log messages in json format. #logging.json: false -# Set to true, to log messages with minimal required Elastic Common Schema (ECS) -# information. Recommended to use in combination with `logging.json=true` +# Set to true, to log messages in Elastic Common Schema (ECS) compliant format. +# Recommended to use in combination with `logging.json=true` # Defaults to false. #logging.ecs: false diff --git a/apm-server.yml b/apm-server.yml index 9e66d7a411b..7422212666e 100644 --- a/apm-server.yml +++ b/apm-server.yml @@ -1066,8 +1066,8 @@ output.elasticsearch: # Set to true to log messages in json format. #logging.json: false -# Set to true, to log messages with minimal required Elastic Common Schema (ECS) -# information. Recommended to use in combination with `logging.json=true` +# Set to true, to log messages in Elastic Common Schema (ECS) compliant format. +# Recommended to use in combination with `logging.json=true` # Defaults to false. #logging.ecs: false diff --git a/beater/middleware/log_middleware.go b/beater/middleware/log_middleware.go index a7b8e5c7b19..92ea121aa63 100644 --- a/beater/middleware/log_middleware.go +++ b/beater/middleware/log_middleware.go @@ -36,67 +36,97 @@ func LogMiddleware() Middleware { return func(h request.Handler) (request.Handler, error) { return func(c *request.Context) { - var reqID, transactionID, traceID string - tx := apm.TransactionFromContext(c.Request.Context()) - if tx != nil { - // This request is being traced, grab its IDs to add to logs. - traceContext := tx.TraceContext() - transactionID = traceContext.Span.String() - traceID = traceContext.Trace.String() - reqID = transactionID - } else { - uuid, err := uuid.NewV4() - if err != nil { - id := request.IDResponseErrorsInternal - logger.Errorw(request.MapResultIDToStatus[id].Keyword, "error", err) - c.Result.SetWithError(id, err) - c.Write() - return - } - reqID = uuid.String() + args, err := requestArgs(c, logger.ECSEnabled()) + if err != nil { + id := request.IDResponseErrorsInternal + logger.Errorw(request.MapResultIDToStatus[id].Keyword, "error", err) + c.Result.SetWithError(id, err) + c.Write() + return } - - reqLogger := logger.With( - "request_id", reqID, - "method", c.Request.Method, - "URL", c.Request.URL, - "content_length", c.Request.ContentLength, - "remote_address", utility.RemoteAddr(c.Request), - "user-agent", c.Request.Header.Get(headers.UserAgent)) - - if traceID != "" { - reqLogger = reqLogger.With( - "trace.id", traceID, - "transaction.id", transactionID, - ) - } - - c.Logger = reqLogger + c.Logger = logger.With(args...) h(c) if c.MultipleWriteAttempts() { - reqLogger.Warn("multiple write attempts") + c.Logger.Warn("multiple write attempts") } - keyword := c.Result.Keyword if keyword == "" { keyword = "handled request" } - - keysAndValues := []interface{}{"response_code", c.Result.StatusCode} - if c.Result.Err != nil { - keysAndValues = append(keysAndValues, "error", c.Result.Err.Error()) - } - if c.Result.Stacktrace != "" { - keysAndValues = append(keysAndValues, "stacktrace", c.Result.Stacktrace) - } - + args = resultArgs(c, logger.ECSEnabled()) if c.Result.Failure() { - reqLogger.Errorw(keyword, keysAndValues...) - } else { - reqLogger.Infow(keyword, keysAndValues...) + c.Logger.Errorw(keyword, args...) + return } - + c.Logger.Infow(keyword, args...) }, nil } } + +func requestArgs(c *request.Context, ecsEnabled bool) ([]interface{}, error) { + var reqID, transactionID, traceID string + tx := apm.TransactionFromContext(c.Request.Context()) + if tx != nil { + // This request is being traced, grab its IDs to add to logs. + traceContext := tx.TraceContext() + transactionID = traceContext.Span.String() + traceID = traceContext.Trace.String() + reqID = transactionID + } else { + uuid, err := uuid.NewV4() + if err != nil { + return nil, err + } + reqID = uuid.String() + } + + args := []interface{}{ + "http", map[string]interface{}{ + "request": map[string]interface{}{ + "id": reqID, //not defined in ECS but fits here best + "method": c.Request.Method, + "body": map[string]interface{}{"bytes": c.Request.ContentLength}}}, + "source", map[string]interface{}{"address": utility.RemoteAddr(c.Request)}, + "user_agent", map[string]interface{}{"original": c.Request.Header.Get(headers.UserAgent)}, + } + if traceID != "" { + args = append(args, + "trace", map[string]interface{}{"id": traceID}, + "transaction", map[string]interface{}{"id": transactionID}) + } + // avoid conflicts on existing log keys + if ecsEnabled { + return append(args, "url", map[string]string{"original": c.Request.URL.String()}), nil + } + return append(args, "URL", c.Request.URL), nil +} + +func resultArgs(c *request.Context, ecsEnabled bool) []interface{} { + args := []interface{}{ + // http key will be duplicated at this point + "http", map[string]interface{}{ + "response": map[string]interface{}{ + "status_code": c.Result.StatusCode}}} + if c.Result.Err == nil && c.Result.Stacktrace == "" { + return args + } + + if ecsEnabled { + err := map[string]interface{}{} + if c.Result.Err != nil { + err["message"] = c.Result.Err.Error() + } + if c.Result.Stacktrace != "" { + err["stacktrace"] = c.Result.Stacktrace + } + return append(args, "error", err) + } + if c.Result.Err != nil { + args = append(args, "error", c.Result.Err) + } + if c.Result.Stacktrace != "" { + args = append(args, "stacktrace", c.Result.Stacktrace) + } + return args +} diff --git a/beater/middleware/log_middleware_test.go b/beater/middleware/log_middleware_test.go index 8337bd7d63e..300fbdbeef3 100644 --- a/beater/middleware/log_middleware_test.go +++ b/beater/middleware/log_middleware_test.go @@ -18,10 +18,10 @@ package middleware import ( + "fmt" "net/http" "testing" - "github.com/pkg/errors" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/zap/zapcore" @@ -29,7 +29,9 @@ import ( "go.elastic.co/apm" "go.elastic.co/apm/apmtest" + "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/libbeat/logp/configure" "github.com/elastic/apm-server/beater/beatertest" "github.com/elastic/apm-server/beater/headers" @@ -38,17 +40,14 @@ import ( ) func TestLogMiddleware(t *testing.T) { - err := logp.DevelopmentSetup(logp.ToObserverOutput()) - require.NoError(t, err) testCases := []struct { name, message string level zapcore.Level handler request.Handler code int - error error - stacktrace bool traced bool + keys, ecsKeys []string }{ { name: "Accepted", @@ -56,6 +55,8 @@ func TestLogMiddleware(t *testing.T) { level: zapcore.InfoLevel, handler: beatertest.Handler202, code: http.StatusAccepted, + keys: []string{"URL"}, + ecsKeys: []string{"url.original"}, }, { name: "Traced", @@ -63,6 +64,8 @@ func TestLogMiddleware(t *testing.T) { level: zapcore.InfoLevel, handler: beatertest.Handler202, code: http.StatusAccepted, + keys: []string{"URL", "trace.id", "transaction.id"}, + ecsKeys: []string{"url.original", "trace.id", "transaction.id"}, traced: true, }, { @@ -71,16 +74,17 @@ func TestLogMiddleware(t *testing.T) { level: zapcore.ErrorLevel, handler: beatertest.Handler403, code: http.StatusForbidden, - error: errors.New("forbidden request"), + keys: []string{"URL", "error"}, + ecsKeys: []string{"url.original", "error.message"}, }, { - name: "Panic", - message: "internal error", - level: zapcore.ErrorLevel, - handler: Apply(RecoverPanicMiddleware(), beatertest.HandlerPanic), - code: http.StatusInternalServerError, - error: errors.New("panic on Handle"), - stacktrace: true, + name: "Panic", + message: "internal error", + level: zapcore.ErrorLevel, + handler: Apply(RecoverPanicMiddleware(), beatertest.HandlerPanic), + code: http.StatusInternalServerError, + keys: []string{"URL", "error", "stacktrace"}, + ecsKeys: []string{"url.original", "error.message", "error.stacktrace"}, }, { name: "Error without keyword", @@ -90,52 +94,54 @@ func TestLogMiddleware(t *testing.T) { c.Result.StatusCode = http.StatusForbidden c.Write() }, - code: http.StatusForbidden, + code: http.StatusForbidden, + keys: []string{"URL"}, + ecsKeys: []string{"url.original"}, }, } for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - c, rec := beatertest.DefaultContextWithResponseRecorder() - c.Request.Header.Set(headers.UserAgent, tc.name) - if tc.traced { - tx := apmtest.DiscardTracer.StartTransaction("name", "type") - c.Request = c.Request.WithContext(apm.ContextWithTransaction(c.Request.Context(), tx)) - defer tx.End() - } - Apply(LogMiddleware(), tc.handler)(c) + for _, withECS := range []bool{true, false} { + name := fmt.Sprintf("%sWithECS%v", tc.name, withECS) + t.Run(name, func(t *testing.T) { + if withECS { + configure.Logging("APM Server test", + common.MustNewConfigFrom(`{"ecs":true}`)) + } + err := logp.DevelopmentSetup(logp.ToObserverOutput()) + require.NoError(t, err) + c, rec := beatertest.DefaultContextWithResponseRecorder() + c.Request.Header.Set(headers.UserAgent, tc.name) + if tc.traced { + tx := apmtest.DiscardTracer.StartTransaction("name", "type") + c.Request = c.Request.WithContext(apm.ContextWithTransaction(c.Request.Context(), tx)) + defer tx.End() + } + Apply(LogMiddleware(), tc.handler)(c) - assert.Equal(t, tc.code, rec.Code) - for i, entry := range logp.ObserverLogs().TakeAll() { - // expect only one log entry per request - assert.Equal(t, i, 0) + assert.Equal(t, tc.code, rec.Code) + entries := logp.ObserverLogs().TakeAll() + require.Equal(t, 1, len(entries)) + entry := entries[0] assert.Equal(t, logs.Request, entry.LoggerName) assert.Equal(t, tc.level, entry.Level) assert.Equal(t, tc.message, entry.Message) - ec := entry.ContextMap() - assert.NotEmpty(t, ec["request_id"]) - assert.NotEmpty(t, ec["method"]) - assert.Equal(t, c.Request.URL.String(), ec["URL"]) - assert.NotEmpty(t, ec["remote_address"]) - assert.Equal(t, c.Request.Header.Get(headers.UserAgent), ec["user-agent"]) - // zap encoded type - assert.Equal(t, tc.code, int(ec["response_code"].(int64))) - if tc.error != nil { - assert.Equal(t, tc.error.Error(), ec["error"]) - } - if tc.stacktrace { - assert.NotZero(t, ec["stacktrace"]) + encoder := zapcore.NewMapObjectEncoder() + ec := common.MapStr{} + for _, f := range entry.Context { + f.AddTo(encoder) + ec.DeepUpdate(encoder.Fields) } - if tc.traced { - assert.NotEmpty(t, ec, "trace.id") - assert.NotEmpty(t, ec, "transaction.id") - assert.Equal(t, ec["request_id"], ec["transaction.id"]) - } else { - assert.NotContains(t, ec, "trace.id") - assert.NotContains(t, ec, "transaction.id") + keys := []string{"http", "http.request.id", "http.request.method", "http.request.body", + "source.address", "user_agent.original", "http.response.status_code"} + keys = append(keys, tc.keys...) + for _, key := range keys { + ok, _ := ec.HasKey(key) + assert.True(t, ok, key) } - } - }) + }) + + } } } diff --git a/docs/copied-from-beats/docs/loggingconfig.asciidoc b/docs/copied-from-beats/docs/loggingconfig.asciidoc index 6c535cfdedc..3a1cc925461 100644 --- a/docs/copied-from-beats/docs/loggingconfig.asciidoc +++ b/docs/copied-from-beats/docs/loggingconfig.asciidoc @@ -238,8 +238,7 @@ When true, logs messages in JSON format. The default is false. [float] ==== `logging.ecs` -When true, logs messages with minimal required Elastic Common Schema (ECS) -information. +When true, logs messages in Elastic Common Schema (ECS) compliant format. ifndef::serverless[] [float] From abf4fd2958ae7ac781de9b7ab1bdeba4cd9cbc41 Mon Sep 17 00:00:00 2001 From: simitt Date: Tue, 16 Jun 2020 11:04:08 +0200 Subject: [PATCH 02/10] get rid of ECSEnabled check, use logger.With --- beater/middleware/log_middleware.go | 97 +++++++++--------------- beater/middleware/log_middleware_test.go | 83 +++++++++----------- 2 files changed, 72 insertions(+), 108 deletions(-) diff --git a/beater/middleware/log_middleware.go b/beater/middleware/log_middleware.go index 92ea121aa63..88468846646 100644 --- a/beater/middleware/log_middleware.go +++ b/beater/middleware/log_middleware.go @@ -32,21 +32,18 @@ import ( // LogMiddleware returns a middleware taking care of logging processing a request in the middleware and the request handler func LogMiddleware() Middleware { - logger := logp.NewLogger(logs.Request) return func(h request.Handler) (request.Handler, error) { - return func(c *request.Context) { - args, err := requestArgs(c, logger.ECSEnabled()) - if err != nil { + c.Logger = loggerWithContext(c) + var err error + if c.Logger, err = loggerWithTraceContext(c); err != nil { id := request.IDResponseErrorsInternal - logger.Errorw(request.MapResultIDToStatus[id].Keyword, "error", err) + c.Logger.Error(request.MapResultIDToStatus[id].Keyword, logp.Error(err)) c.Result.SetWithError(id, err) c.Write() return } - c.Logger = logger.With(args...) h(c) - if c.MultipleWriteAttempts() { c.Logger.Warn("multiple write attempts") } @@ -54,79 +51,55 @@ func LogMiddleware() Middleware { if keyword == "" { keyword = "handled request" } - args = resultArgs(c, logger.ECSEnabled()) + c.Logger = loggerWithResult(c) if c.Result.Failure() { - c.Logger.Errorw(keyword, args...) + c.Logger.Error(keyword) return } - c.Logger.Infow(keyword, args...) + c.Logger.Info(keyword) }, nil } } -func requestArgs(c *request.Context, ecsEnabled bool) ([]interface{}, error) { - var reqID, transactionID, traceID string +func loggerWithContext(c *request.Context) *logp.Logger { + return logp.NewLogger(logs.Request).With( + "http.request.method", c.Request.Method, + "http.request.body.bytes", c.Request.ContentLength, + "source.address", utility.RemoteAddr(c.Request), + "user_agent.original", c.Request.Header.Get(headers.UserAgent), + "url.original", c.Request.URL.String()) +} + +func loggerWithTraceContext(c *request.Context) (*logp.Logger, error) { tx := apm.TransactionFromContext(c.Request.Context()) - if tx != nil { - // This request is being traced, grab its IDs to add to logs. - traceContext := tx.TraceContext() - transactionID = traceContext.Span.String() - traceID = traceContext.Trace.String() - reqID = transactionID - } else { + if tx == nil { uuid, err := uuid.NewV4() if err != nil { - return nil, err + return c.Logger, err } - reqID = uuid.String() - } - - args := []interface{}{ - "http", map[string]interface{}{ - "request": map[string]interface{}{ - "id": reqID, //not defined in ECS but fits here best - "method": c.Request.Method, - "body": map[string]interface{}{"bytes": c.Request.ContentLength}}}, - "source", map[string]interface{}{"address": utility.RemoteAddr(c.Request)}, - "user_agent", map[string]interface{}{"original": c.Request.Header.Get(headers.UserAgent)}, + return c.Logger.With("http.request.id", uuid.String()), nil } - if traceID != "" { - args = append(args, - "trace", map[string]interface{}{"id": traceID}, - "transaction", map[string]interface{}{"id": transactionID}) - } - // avoid conflicts on existing log keys - if ecsEnabled { - return append(args, "url", map[string]string{"original": c.Request.URL.String()}), nil - } - return append(args, "URL", c.Request.URL), nil + // This request is being traced, grab its IDs to add to logs. + traceContext := tx.TraceContext() + transactionID := traceContext.Span.String() + return c.Logger.With( + "trace.id", traceContext.Trace.String(), + "transaction.id", transactionID, + "http.request.id", transactionID, + ), nil } -func resultArgs(c *request.Context, ecsEnabled bool) []interface{} { - args := []interface{}{ - // http key will be duplicated at this point - "http", map[string]interface{}{ - "response": map[string]interface{}{ - "status_code": c.Result.StatusCode}}} +func loggerWithResult(c *request.Context) *logp.Logger { + logger := c.Logger.With( + "http.response.status_code", c.Result.StatusCode) if c.Result.Err == nil && c.Result.Stacktrace == "" { - return args - } - - if ecsEnabled { - err := map[string]interface{}{} - if c.Result.Err != nil { - err["message"] = c.Result.Err.Error() - } - if c.Result.Stacktrace != "" { - err["stacktrace"] = c.Result.Stacktrace - } - return append(args, "error", err) + return logger } if c.Result.Err != nil { - args = append(args, "error", c.Result.Err) + logger = logger.With("error.message", c.Result.Err.Error()) } if c.Result.Stacktrace != "" { - args = append(args, "stacktrace", c.Result.Stacktrace) + logger = logger.With("error.stacktrace", c.Result.Stacktrace) } - return args + return logger } diff --git a/beater/middleware/log_middleware_test.go b/beater/middleware/log_middleware_test.go index 300fbdbeef3..97981bcbe16 100644 --- a/beater/middleware/log_middleware_test.go +++ b/beater/middleware/log_middleware_test.go @@ -18,7 +18,6 @@ package middleware import ( - "fmt" "net/http" "testing" @@ -47,7 +46,7 @@ func TestLogMiddleware(t *testing.T) { handler request.Handler code int traced bool - keys, ecsKeys []string + ecsKeys []string }{ { name: "Accepted", @@ -55,7 +54,6 @@ func TestLogMiddleware(t *testing.T) { level: zapcore.InfoLevel, handler: beatertest.Handler202, code: http.StatusAccepted, - keys: []string{"URL"}, ecsKeys: []string{"url.original"}, }, { @@ -64,7 +62,6 @@ func TestLogMiddleware(t *testing.T) { level: zapcore.InfoLevel, handler: beatertest.Handler202, code: http.StatusAccepted, - keys: []string{"URL", "trace.id", "transaction.id"}, ecsKeys: []string{"url.original", "trace.id", "transaction.id"}, traced: true, }, @@ -74,7 +71,6 @@ func TestLogMiddleware(t *testing.T) { level: zapcore.ErrorLevel, handler: beatertest.Handler403, code: http.StatusForbidden, - keys: []string{"URL", "error"}, ecsKeys: []string{"url.original", "error.message"}, }, { @@ -83,7 +79,6 @@ func TestLogMiddleware(t *testing.T) { level: zapcore.ErrorLevel, handler: Apply(RecoverPanicMiddleware(), beatertest.HandlerPanic), code: http.StatusInternalServerError, - keys: []string{"URL", "error", "stacktrace"}, ecsKeys: []string{"url.original", "error.message", "error.stacktrace"}, }, { @@ -95,53 +90,49 @@ func TestLogMiddleware(t *testing.T) { c.Write() }, code: http.StatusForbidden, - keys: []string{"URL"}, ecsKeys: []string{"url.original"}, }, } for _, tc := range testCases { - for _, withECS := range []bool{true, false} { - name := fmt.Sprintf("%sWithECS%v", tc.name, withECS) - t.Run(name, func(t *testing.T) { - if withECS { - configure.Logging("APM Server test", - common.MustNewConfigFrom(`{"ecs":true}`)) - } - err := logp.DevelopmentSetup(logp.ToObserverOutput()) - require.NoError(t, err) - c, rec := beatertest.DefaultContextWithResponseRecorder() - c.Request.Header.Set(headers.UserAgent, tc.name) - if tc.traced { - tx := apmtest.DiscardTracer.StartTransaction("name", "type") - c.Request = c.Request.WithContext(apm.ContextWithTransaction(c.Request.Context(), tx)) - defer tx.End() - } - Apply(LogMiddleware(), tc.handler)(c) + t.Run(tc.name, func(t *testing.T) { + // log setup + configure.Logging("APM Server test", + common.MustNewConfigFrom(`{"ecs":true}`)) + require.NoError(t, logp.DevelopmentSetup(logp.ToObserverOutput())) - assert.Equal(t, tc.code, rec.Code) - entries := logp.ObserverLogs().TakeAll() - require.Equal(t, 1, len(entries)) - entry := entries[0] - assert.Equal(t, logs.Request, entry.LoggerName) - assert.Equal(t, tc.level, entry.Level) - assert.Equal(t, tc.message, entry.Message) + // prepare and record request + c, rec := beatertest.DefaultContextWithResponseRecorder() + c.Request.Header.Set(headers.UserAgent, tc.name) + if tc.traced { + tx := apmtest.DiscardTracer.StartTransaction("name", "type") + c.Request = c.Request.WithContext(apm.ContextWithTransaction(c.Request.Context(), tx)) + defer tx.End() + } + Apply(LogMiddleware(), tc.handler)(c) - encoder := zapcore.NewMapObjectEncoder() - ec := common.MapStr{} - for _, f := range entry.Context { - f.AddTo(encoder) - ec.DeepUpdate(encoder.Fields) - } - keys := []string{"http", "http.request.id", "http.request.method", "http.request.body", - "source.address", "user_agent.original", "http.response.status_code"} - keys = append(keys, tc.keys...) - for _, key := range keys { - ok, _ := ec.HasKey(key) - assert.True(t, ok, key) - } - }) + // check log lines + assert.Equal(t, tc.code, rec.Code) + entries := logp.ObserverLogs().TakeAll() + require.Equal(t, 1, len(entries)) + entry := entries[0] + assert.Equal(t, logs.Request, entry.LoggerName) + assert.Equal(t, tc.level, entry.Level) + assert.Equal(t, tc.message, entry.Message) - } + encoder := zapcore.NewMapObjectEncoder() + ec := common.MapStr{} + for _, f := range entry.Context { + f.AddTo(encoder) + ec.DeepUpdate(encoder.Fields) + } + keys := []string{"http.request.id", "http.request.method", "http.request.body.bytes", + "source.address", "user_agent.original", "http.response.status_code"} + keys = append(keys, tc.ecsKeys...) + for _, key := range keys { + ok, _ := ec.HasKey(key) + assert.True(t, ok, key) + } + }) } } From 75f20c692a850d02d0db019af93526fba35c0c7d Mon Sep 17 00:00:00 2001 From: simitt Date: Tue, 16 Jun 2020 11:19:41 +0200 Subject: [PATCH 03/10] enable ecs.logging by default --- _meta/beat.yml | 8 ++++---- apm-server.docker.yml | 8 ++++---- apm-server.yml | 8 ++++---- cmd/root.go | 1 + 4 files changed, 13 insertions(+), 12 deletions(-) diff --git a/_meta/beat.yml b/_meta/beat.yml index 08fd2496b0b..ca2de20b6ed 100644 --- a/_meta/beat.yml +++ b/_meta/beat.yml @@ -1066,10 +1066,10 @@ output.elasticsearch: # Set to true to log messages in json format. #logging.json: false -# Set to true, to log messages in Elastic Common Schema (ECS) compliant format. -# Recommended to use in combination with `logging.json=true` -# Defaults to false. -#logging.ecs: false +# Set to true, to log messages with minimal required Elastic Common Schema (ECS) +# information. Recommended to use in combination with `logging.json=true` +# Defaults to true. +#logging.ecs: true #=============================== HTTP Endpoint =============================== diff --git a/apm-server.docker.yml b/apm-server.docker.yml index e087593a614..67ba67e8e1a 100644 --- a/apm-server.docker.yml +++ b/apm-server.docker.yml @@ -1066,10 +1066,10 @@ output.elasticsearch: # Set to true to log messages in json format. #logging.json: false -# Set to true, to log messages in Elastic Common Schema (ECS) compliant format. -# Recommended to use in combination with `logging.json=true` -# Defaults to false. -#logging.ecs: false +# Set to true, to log messages with minimal required Elastic Common Schema (ECS) +# information. Recommended to use in combination with `logging.json=true` +# Defaults to true. +#logging.ecs: true #=============================== HTTP Endpoint =============================== diff --git a/apm-server.yml b/apm-server.yml index 7422212666e..0e6db52ae33 100644 --- a/apm-server.yml +++ b/apm-server.yml @@ -1066,10 +1066,10 @@ output.elasticsearch: # Set to true to log messages in json format. #logging.json: false -# Set to true, to log messages in Elastic Common Schema (ECS) compliant format. -# Recommended to use in combination with `logging.json=true` -# Defaults to false. -#logging.ecs: false +# Set to true, to log messages with minimal required Elastic Common Schema (ECS) +# information. Recommended to use in combination with `logging.json=true` +# Defaults to true. +#logging.ecs: true #=============================== HTTP Endpoint =============================== diff --git a/cmd/root.go b/cmd/root.go index 6dbe3edb39b..18a3899e8eb 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -47,6 +47,7 @@ var libbeatConfigOverrides = common.MustNewConfigFrom(map[string]interface{}{ "files": map[string]interface{}{ "rotateeverybytes": 10 * 1024 * 1024, }, + "ecs": true, }, "setup": map[string]interface{}{ "template": map[string]interface{}{ From 9c0d1f51200200c5b2390821e95591e041995eff Mon Sep 17 00:00:00 2001 From: simitt Date: Tue, 16 Jun 2020 11:30:42 +0200 Subject: [PATCH 04/10] Add changelog --- changelogs/head.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/changelogs/head.asciidoc b/changelogs/head.asciidoc index 11d50169508..36b4321e7b7 100644 --- a/changelogs/head.asciidoc +++ b/changelogs/head.asciidoc @@ -20,3 +20,4 @@ https://github.com/elastic/apm-server/compare/7.8\...master[View commits] * Jaeger traceIds/spanIds are formatted without leading zeros {pull}3849[3849] * Index Page URL and referer as ECS fields {pull}3857[3857] * Map the Jaeger attribute message_bus.destination to the Elastic APM type messaging {pull}3884[3884] +* Switch logging format to be ECS compliant where possible {pull}3829[3829] From 51d59db8e610d8418f99e066e7b9b73bc145b8b3 Mon Sep 17 00:00:00 2001 From: simitt Date: Wed, 17 Jun 2020 10:17:35 +0200 Subject: [PATCH 05/10] Fix system tests --- tests/system/apmserver.py | 4 +- tests/system/config/apm-server.yml.j2 | 31 ++------------ tests/system/test_export.py | 2 +- tests/system/test_integration_acm.py | 51 ++++++++++++------------ tests/system/test_integration_logging.py | 18 ++++----- tests/system/test_jaeger.py | 1 + 6 files changed, 43 insertions(+), 64 deletions(-) diff --git a/tests/system/apmserver.py b/tests/system/apmserver.py index cafee22e62d..dc6ae02a57a 100644 --- a/tests/system/apmserver.py +++ b/tests/system/apmserver.py @@ -344,8 +344,8 @@ def check_for_no_smap(self, doc): def logged_requests(self, url="/intake/v2/events"): for line in self.get_log_lines(): jline = json.loads(line) - u = urlparse(jline.get("URL", "")) - if jline.get("logger") == "request" and u.path == url: + u = urlparse(jline.get("url.original", "")) + if jline.get("log.logger") == "request" and u.path == url: yield jline def approve_docs(self, base_path, received): diff --git a/tests/system/config/apm-server.yml.j2 b/tests/system/config/apm-server.yml.j2 index d2ba5df201f..e478bf37841 100644 --- a/tests/system/config/apm-server.yml.j2 +++ b/tests/system/config/apm-server.yml.j2 @@ -278,22 +278,9 @@ output.elasticsearch: ############################# Logging ######################################### -{% if logging_json or logging_level %} +{% if logging_json or logging_level or logging_ecs_disabled %} logging: -{% else %} -#logging: {% endif %} - # Send all logging output to syslog. On Windows default is false, otherwise - # default is true. - #to_syslog: true - - # Write all logging output to files. Beats automatically rotate files if configurable - # limit is reached. - #to_files: false - - # Enable debug output for selected components. - #selectors: [] - {% if logging_json %} # Set to true to log messages in json format. json: {{ logging_json }} @@ -304,19 +291,9 @@ logging: level: {{ logging_level }} {% endif %} - #files: - # The directory where the log files will written to. - #path: /var/log/apm-server - - # The name of the files where the logs are written to. - #name: apm-server - - # Configure log file size limit. If limit is reached, log file will be - # automatically rotated - #rotateeverybytes: 10485760 # = 10MB - - # Number of rotated log files to keep. Oldest files will be deleted first. - #keepfiles: 7 +{% if logging_ecs_disabled %} + ecs: false +{% endif %} queue.mem.flush.min_events: {{ queue_flush }} diff --git a/tests/system/test_export.py b/tests/system/test_export.py index b523c9878d8..3c117ff23d1 100644 --- a/tests/system/test_export.py +++ b/tests/system/test_export.py @@ -30,7 +30,7 @@ def test_export_config(self): config = yaml.load(self.command_output) # logging settings self.assertDictEqual( - {"metrics": {"enabled": False}, 'files': {'rotateeverybytes': 10485760}, }, config["logging"] + {"metrics": {"enabled": False}, "files": {"rotateeverybytes": 10485760}, "ecs": True}, config["logging"] ) # template settings diff --git a/tests/system/test_integration_acm.py b/tests/system/test_integration_acm.py index 6eb388f8dd6..2513d785590 100644 --- a/tests/system/test_integration_acm.py +++ b/tests/system/test_integration_acm.py @@ -42,10 +42,10 @@ def test_config_requests(self): ) assert r1.status_code == 400, r1.status_code expect_log.append({ - "level": "error", + "log.level": "error", "message": "invalid query", - "error": "service.name is required", - "response_code": 400, + "error.message": "service.name is required", + "http.response.status_code": 400, }) # no configuration for service @@ -55,9 +55,9 @@ def test_config_requests(self): ) assert r2.status_code == 200, r2.status_code expect_log.append({ - "level": "info", + "log.level": "info", "message": "request ok", - "response_code": 200, + "http.response.status_code": 200, }) self.assertDictEqual({}, r2.json()) @@ -70,9 +70,9 @@ def test_config_requests(self): assert r3.status_code == 200, r3.status_code # TODO (gr): validate Cache-Control header - https://github.com/elastic/apm-server/issues/2438 expect_log.append({ - "level": "info", + "log.level": "info", "message": "request ok", - "response_code": 200, + "http.response.status_code": 200, }) self.assertDictEqual({"transaction_sample_rate": "0.05"}, r3.json()) @@ -85,9 +85,9 @@ def test_config_requests(self): }) assert r3_again.status_code == 304, r3_again.status_code expect_log.append({ - "level": "info", + "log.level": "info", "message": "not modified", - "response_code": 304, + "http.response.status_code": 304, }) self.create_service_config( @@ -103,9 +103,9 @@ def test_config_requests(self): assert r4.status_code == 200, r4.status_code self.assertDictEqual({"transaction_sample_rate": "0.15"}, r4.json()) expect_log.append({ - "level": "info", + "log.level": "info", "message": "request ok", - "response_code": 200, + "http.response.status_code": 200, }) # not modified on re-request @@ -120,9 +120,9 @@ def test_config_requests(self): }) assert r4_again.status_code == 304, r4_again.status_code expect_log.append({ - "level": "info", + "log.level": "info", "message": "not modified", - "response_code": 304, + "http.response.status_code": 304, }) self.update_service_config( @@ -144,9 +144,9 @@ def test_config_requests(self): assert r4_post_update.status_code == 200, r4_post_update.status_code self.assertDictEqual({"transaction_sample_rate": "0.99"}, r4_post_update.json()) expect_log.append({ - "level": "info", + "log.level": "info", "message": "request ok", - "response_code": 200, + "http.response.status_code": 200, }) # configuration for service+environment (all includes non existing) @@ -158,14 +158,15 @@ def test_config_requests(self): headers={"Content-Type": "application/json"}) assert r5.status_code == 200, r5.status_code expect_log.append({ - "level": "info", + "log.level": "info", "message": "request ok", - "response_code": 200, + "http.response.status_code": 200, }) self.assertDictEqual({"transaction_sample_rate": "0.05"}, r5.json()) config_request_logs = list(self.logged_requests(url="/config/v1/agents")) - assert len(config_request_logs) == len(expect_log) + assert len(config_request_logs) == len( + expect_log), "expected\n{}\nreceived\n{}".format(expect_log, config_request_logs) for want, got in zip(expect_log, config_request_logs): self.assertDictContainsSubset(want, got) @@ -207,15 +208,15 @@ def test_config_requests(self): config_request_logs = list(self.logged_requests(url="/config/v1/agents")) assert len(config_request_logs) == 2, config_request_logs self.assertDictContainsSubset({ - "level": "error", + "log.level": "error", "message": "unauthorized", - "error": "unauthorized", - "response_code": 401, + "error.message": "unauthorized", + "http.response.status_code": 401, }, config_request_logs[0]) self.assertDictContainsSubset({ - "level": "error", + "log.level": "error", "message": "unable to retrieve connection to Kibana", - "response_code": 503, + "http.response.status_code": 503, }, config_request_logs[1]) @@ -234,9 +235,9 @@ def test_log_kill_switch_active(self): assert r.status_code == 403, r.status_code config_request_logs = list(self.logged_requests(url="/config/v1/agents")) self.assertDictContainsSubset({ - "level": "error", + "log.level": "error", "message": "forbidden request", - "response_code": 403, + "http.response.status_code": 403, }, config_request_logs[0]) diff --git a/tests/system/test_integration_logging.py b/tests/system/test_integration_logging.py index facd519ce9a..d3eb91a1834 100644 --- a/tests/system/test_integration_logging.py +++ b/tests/system/test_integration_logging.py @@ -20,9 +20,9 @@ def test_log_valid_event(self): assert len(intake_request_logs) == 1, "multiple requests found" req = intake_request_logs[0] self.assertDictContainsSubset({ - "level": "info", + "log.level": "info", "message": "request accepted", - "response_code": 202, + "http.response.status_code": 202, }, req) def test_log_invalid_event(self): @@ -35,11 +35,11 @@ def test_log_invalid_event(self): assert len(intake_request_logs) == 1, "multiple requests found" req = intake_request_logs[0] self.assertDictContainsSubset({ - "level": "error", + "log.level": "error", "message": "data validation error", - "response_code": 400, + "http.response.status_code": 400, }, req) - error = req.get("error") + error = req.get("error.message") assert error.startswith("failed to validate transaction: error validating JSON:"), json.dumps(req) @@ -60,11 +60,11 @@ def test_log_event_size_exceeded(self): assert len(intake_request_logs) == 1, "multiple requests found" req = intake_request_logs[0] self.assertDictContainsSubset({ - "level": "error", + "log.level": "error", "message": "request body too large", - "response_code": 400, + "http.response.status_code": 400, }, req) - error = req.get("error") + error = req.get("error.message") assert error.startswith("event exceeded the permitted size."), json.dumps(req) @@ -86,7 +86,7 @@ def test_trace_ids(self): req = intake_request_logs[0] self.assertIn("trace.id", req) self.assertIn("transaction.id", req) - self.assertEqual(req["transaction.id"], req["request_id"]) + self.assertEqual(req["transaction.id"], req["http.request.id"]) class LoggingToEnvContainer(ServerBaseTest): diff --git a/tests/system/test_jaeger.py b/tests/system/test_jaeger.py index cddbc47f62f..4d56b4b4a9a 100644 --- a/tests/system/test_jaeger.py +++ b/tests/system/test_jaeger.py @@ -33,6 +33,7 @@ def config(self): # check that the authorization tag is always removed, # even if there's no secret token / API Key auth. "jaeger_grpc_auth_tag": "authorization", + "logging_ecs_disabled": "true", }) return cfg From 43e9370d03f6a95c7cb141dd055ab2d22c0d9134 Mon Sep 17 00:00:00 2001 From: simitt Date: Thu, 18 Jun 2020 09:46:56 +0200 Subject: [PATCH 06/10] Adressing PR review comments --- beater/middleware/log_middleware.go | 20 ++++++++++---------- beater/middleware/log_middleware_test.go | 2 +- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/beater/middleware/log_middleware.go b/beater/middleware/log_middleware.go index 88468846646..55b3caaddd8 100644 --- a/beater/middleware/log_middleware.go +++ b/beater/middleware/log_middleware.go @@ -34,7 +34,7 @@ import ( func LogMiddleware() Middleware { return func(h request.Handler) (request.Handler, error) { return func(c *request.Context) { - c.Logger = loggerWithContext(c) + c.Logger = loggerWithRequestContext(c) var err error if c.Logger, err = loggerWithTraceContext(c); err != nil { id := request.IDResponseErrorsInternal @@ -61,13 +61,16 @@ func LogMiddleware() Middleware { } } -func loggerWithContext(c *request.Context) *logp.Logger { - return logp.NewLogger(logs.Request).With( +func loggerWithRequestContext(c *request.Context) *logp.Logger { + logger := logp.NewLogger(logs.Request).With( + "url.original", c.Request.URL.String(), "http.request.method", c.Request.Method, - "http.request.body.bytes", c.Request.ContentLength, - "source.address", utility.RemoteAddr(c.Request), "user_agent.original", c.Request.Header.Get(headers.UserAgent), - "url.original", c.Request.URL.String()) + "source.address", utility.RemoteAddr(c.Request)) + if c.Request.ContentLength != -1 { + logger = logger.With("http.request.body.bytes", c.Request.ContentLength) + } + return logger } func loggerWithTraceContext(c *request.Context) (*logp.Logger, error) { @@ -92,14 +95,11 @@ func loggerWithTraceContext(c *request.Context) (*logp.Logger, error) { func loggerWithResult(c *request.Context) *logp.Logger { logger := c.Logger.With( "http.response.status_code", c.Result.StatusCode) - if c.Result.Err == nil && c.Result.Stacktrace == "" { - return logger - } if c.Result.Err != nil { logger = logger.With("error.message", c.Result.Err.Error()) } if c.Result.Stacktrace != "" { - logger = logger.With("error.stacktrace", c.Result.Stacktrace) + logger = logger.With("error.stack_trace", c.Result.Stacktrace) } return logger } diff --git a/beater/middleware/log_middleware_test.go b/beater/middleware/log_middleware_test.go index 97981bcbe16..04ca498c51f 100644 --- a/beater/middleware/log_middleware_test.go +++ b/beater/middleware/log_middleware_test.go @@ -79,7 +79,7 @@ func TestLogMiddleware(t *testing.T) { level: zapcore.ErrorLevel, handler: Apply(RecoverPanicMiddleware(), beatertest.HandlerPanic), code: http.StatusInternalServerError, - ecsKeys: []string{"url.original", "error.message", "error.stacktrace"}, + ecsKeys: []string{"url.original", "error.message", "error.stack_trace"}, }, { name: "Error without keyword", From 8a1748ce1e356ed9ecfda3000ccc985971843cbd Mon Sep 17 00:00:00 2001 From: simitt Date: Thu, 10 Sep 2020 13:24:59 +0200 Subject: [PATCH 07/10] Adapt systemtests to changed logging keys --- systemtest/apmservertest/server.go | 14 +++++++------- systemtest/logging_test.go | 8 ++++---- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/systemtest/apmservertest/server.go b/systemtest/apmservertest/server.go index bfbc3dfc4b4..74b7249e54a 100644 --- a/systemtest/apmservertest/server.go +++ b/systemtest/apmservertest/server.go @@ -286,11 +286,11 @@ func (s *Server) consumeStderr(procStderr io.Reader) { s.Stderr = stderrPipeReader type logEntry struct { - Timestamp logpTimestamp - Level zapcore.Level - Logger string - Caller string - Message string + Timestamp logpTimestamp `json:"timestamp"` + Level zapcore.Level `json:"log.level"` + Logger string `json:"log.logger"` + Caller string `json:"caller"` + Message string `json:"message"` } decoder := json.NewDecoder(procStderr) @@ -308,8 +308,8 @@ func (s *Server) consumeStderr(procStderr io.Reader) { break } delete(fields, "timestamp") - delete(fields, "level") - delete(fields, "logger") + delete(fields, "log.level") + delete(fields, "log.logger") delete(fields, "caller") delete(fields, "message") s.Logs.add(LogEntry{ diff --git a/systemtest/logging_test.go b/systemtest/logging_test.go index 454ec906325..95d95ba9605 100644 --- a/systemtest/logging_test.go +++ b/systemtest/logging_test.go @@ -69,8 +69,8 @@ func TestAPMServerRequestLoggingValid(t *testing.T) { srv.Close() for _, entry := range srv.Logs.All() { - if entry.Logger == "request" && entry.Fields["URL"] == "/intake/v2/events" { - statusCode, _ := entry.Fields["response_code"].(float64) + if entry.Logger == "request" && entry.Fields["url.original"] == "/intake/v2/events" { + statusCode, _ := entry.Fields["http.response.status_code"].(float64) logEntries = append(logEntries, entry) requestEntries = append(requestEntries, requestEntry{ level: entry.Level, @@ -95,8 +95,8 @@ func TestAPMServerRequestLoggingValid(t *testing.T) { }}, requestEntries) assert.NotContains(t, logEntries[0].Fields, "error") - assert.Regexp(t, "failed to validate transaction: .*", logEntries[1].Fields["error"]) - assert.Equal(t, "event exceeded the permitted size.", logEntries[2].Fields["error"]) + assert.Regexp(t, "failed to validate transaction: .*", logEntries[1].Fields["error.message"]) + assert.Equal(t, "event exceeded the permitted size.", logEntries[2].Fields["error.message"]) } // validMetadataJSON returns valid JSON-encoded metadata, From 27fd10aebe7ab735ee268c2010ce0df755af213c Mon Sep 17 00:00:00 2001 From: simitt Date: Fri, 4 Dec 2020 11:05:53 +0100 Subject: [PATCH 08/10] update systemtest --- systemtest/export_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/systemtest/export_test.go b/systemtest/export_test.go index b11940d7245..03bcae3086a 100644 --- a/systemtest/export_test.go +++ b/systemtest/export_test.go @@ -49,6 +49,7 @@ func TestExportConfigDefaults(t *testing.T) { expectedConfig := strings.ReplaceAll(` logging: + ecs: false metrics: enabled: false path: @@ -69,6 +70,7 @@ func TestExportConfigOverrideDefaults(t *testing.T) { expectedConfig := strings.ReplaceAll(` logging: + ecs: true metrics: enabled: true path: From 076e7ca5baa3a39145f68a50b324883fa5db2da9 Mon Sep 17 00:00:00 2001 From: simitt Date: Fri, 4 Dec 2020 16:31:11 +0100 Subject: [PATCH 09/10] fix test --- systemtest/export_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/systemtest/export_test.go b/systemtest/export_test.go index 03bcae3086a..51849acfbc0 100644 --- a/systemtest/export_test.go +++ b/systemtest/export_test.go @@ -49,7 +49,7 @@ func TestExportConfigDefaults(t *testing.T) { expectedConfig := strings.ReplaceAll(` logging: - ecs: false + ecs: true metrics: enabled: false path: From 9aae7b681a633a50ff6f42e78f08df8926f6539f Mon Sep 17 00:00:00 2001 From: simitt Date: Tue, 8 Dec 2020 20:42:38 +0100 Subject: [PATCH 10/10] fix timestamp --- systemtest/apmservertest/server.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/systemtest/apmservertest/server.go b/systemtest/apmservertest/server.go index 93ed399a7c5..5882a222539 100644 --- a/systemtest/apmservertest/server.go +++ b/systemtest/apmservertest/server.go @@ -288,7 +288,7 @@ func (s *Server) consumeStderr(procStderr io.Reader) { s.Stderr = stderrPipeReader type logEntry struct { - Timestamp logpTimestamp `json:"timestamp"` + Timestamp logpTimestamp `json:"@timestamp"` Message string `json:"message"` Level zapcore.Level `json:"log.level"` Logger string `json:"log.logger"` @@ -312,7 +312,7 @@ func (s *Server) consumeStderr(procStderr io.Reader) { if err := json.Unmarshal(raw, &fields); err != nil { break } - delete(fields, "timestamp") + delete(fields, "@timestamp") delete(fields, "log.level") delete(fields, "log.logger") delete(fields, "log.origin")