diff --git a/CHANGELOG.md b/CHANGELOG.md index 9eb63f7884e..dab41cfcf0c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ * [CHANGE] Histogram `cortex_memcache_request_duration_seconds` `method` label value changes from `Memcached.Get` to `Memcached.GetBatched` for batched lookups, and is not reported for non-batched lookups (label value `Memcached.GetMulti` remains, and had exactly the same value as `Get` in nonbatched lookups). The same change applies to tracing spans. #3046 * [CHANGE] TLS server validation is now enabled by default, a new parameter `tls_insecure_skip_verify` can be set to true to skip validation optionally. #3030 * [CHANGE] `cortex_ruler_config_update_failures_total` has been removed in favor of `cortex_ruler_config_last_reload_successful`. #3056 +* [FEATURE] Logging of the source IP passed along by a reverse proxy is now supported by setting the `-server.log-source-ips-enabled`. For non standard headers the settings `-server.log-source-ips-header` and `-server.log-source-ips-regex` can be used. #2985 * [ENHANCEMENT] Add support for azure storage in China, German and US Government environments. #2988 * [ENHANCEMENT] Query-tee: added a small tolerance to floating point sample values comparison. #2994 * [ENHANCEMENT] Query-tee: add support for doing a passthrough of requests to preferred backend for unregistered routes #3018 diff --git a/docs/configuration/arguments.md b/docs/configuration/arguments.md index 88ef0ce2841..a83c797cb71 100644 --- a/docs/configuration/arguments.md +++ b/docs/configuration/arguments.md @@ -487,3 +487,20 @@ The DNS service discovery, inspired from Thanos DNS SD, supports different disco The domain name after the prefix is looked up as a SRV query, and then each SRV record is resolved as an A/AAAA record. For example: `dnssrv+memcached.namespace.svc.cluster.local` - **`dnssrvnoa+`**
The domain name after the prefix is looked up as a SRV query, with no A/AAAA lookup made after that. For example: `dnssrvnoa+memcached.namespace.svc.cluster.local` + +## Logging of IP of reverse proxy + +If a reverse proxy is used in front of Cortex it might be diffult to troubleshoot errors. The following 3 settings can be used to log the IP address passed along by the reverse proxy in headers like X-Forwarded-For. + +- `-server.log_source_ips_enabled` + + Set this to `true` to add logging of the IP when a Forwarded, X-Real-IP or X-Forwarded-For header is used. A field called `sourceIPs` will be added to error logs when data is pushed into Cortex. + +- `-server.log-source-ips-header` + + Header field storing the source IPs. It is only used if `-server.log-source-ips-enabled` is true and if `-server.log-source-ips-regex` is set. If not set the default Forwarded, X-Real-IP or X-Forwarded-For headers are searched. + +- `-server.log-source-ips-regex` + + Regular expression for matching the source IPs. It should contain at least one capturing group the first of which will be returned. Only used if `-server.log-source-ips-enabled` is true and if `-server.log-source-ips-header` is set. If not set the default Forwarded, X-Real-IP or X-Forwarded-For headers are searched. + diff --git a/pkg/api/api.go b/pkg/api/api.go index 85443628c5a..f94e4fec7f7 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -65,17 +65,29 @@ type API struct { authMiddleware middleware.Interface server *server.Server logger log.Logger + sourceIPs *middleware.SourceIPExtractor } -func New(cfg Config, s *server.Server, logger log.Logger) (*API, error) { +func New(cfg Config, serverCfg server.Config, s *server.Server, logger log.Logger) (*API, error) { // Ensure the encoded path is used. Required for the rules API s.HTTP.UseEncodedPath() + var sourceIPs *middleware.SourceIPExtractor + if serverCfg.LogSourceIPs { + var err error + sourceIPs, err = middleware.NewSourceIPs(serverCfg.LogSourceIPsHeader, serverCfg.LogSourceIPsRegex) + if err != nil { + // This should have already been caught in the Server creation + return nil, err + } + } + api := &API{ cfg: cfg, authMiddleware: cfg.HTTPAuthMiddleware, server: s, logger: logger, + sourceIPs: sourceIPs, } // If no authentication middleware is present in the config, use the default authentication middleware. @@ -161,12 +173,12 @@ func (a *API) RegisterAPI(cfg interface{}) { // RegisterDistributor registers the endpoints associated with the distributor. func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distributor.Config) { - a.RegisterRoute("/api/v1/push", push.Handler(pushConfig, d.Push), true) + a.RegisterRoute("/api/v1/push", push.Handler(pushConfig, a.sourceIPs, d.Push), true) a.RegisterRoute("/distributor/all_user_stats", http.HandlerFunc(d.AllUserStatsHandler), false) a.RegisterRoute("/distributor/ha_tracker", d.HATracker, false) // Legacy Routes - a.RegisterRoute(a.cfg.LegacyHTTPPrefix+"/push", push.Handler(pushConfig, d.Push), true) + a.RegisterRoute(a.cfg.LegacyHTTPPrefix+"/push", push.Handler(pushConfig, a.sourceIPs, d.Push), true) a.RegisterRoute("/all_user_stats", http.HandlerFunc(d.AllUserStatsHandler), false) a.RegisterRoute("/ha-tracker", d.HATracker, false) } @@ -177,12 +189,12 @@ func (a *API) RegisterIngester(i *ingester.Ingester, pushConfig distributor.Conf a.RegisterRoute("/ingester/flush", http.HandlerFunc(i.FlushHandler), false) a.RegisterRoute("/ingester/shutdown", http.HandlerFunc(i.ShutdownHandler), false) - a.RegisterRoute("/ingester/push", push.Handler(pushConfig, i.Push), true) // For testing and debugging. + a.RegisterRoute("/ingester/push", push.Handler(pushConfig, a.sourceIPs, i.Push), true) // For testing and debugging. // Legacy Routes a.RegisterRoute("/flush", http.HandlerFunc(i.FlushHandler), false) a.RegisterRoute("/shutdown", http.HandlerFunc(i.ShutdownHandler), false) - a.RegisterRoute("/push", push.Handler(pushConfig, i.Push), true) // For testing and debugging. + a.RegisterRoute("/push", push.Handler(pushConfig, a.sourceIPs, i.Push), true) // For testing and debugging. } // RegisterPurger registers the endpoints associated with the Purger/DeleteStore. They do not exactly diff --git a/pkg/api/api_test.go b/pkg/api/api_test.go new file mode 100644 index 00000000000..934e4d935c0 --- /dev/null +++ b/pkg/api/api_test.go @@ -0,0 +1,61 @@ +package api + +import ( + "testing" + + "github.com/gorilla/mux" + "github.com/stretchr/testify/require" + "github.com/weaveworks/common/server" +) + +type FakeLogger struct{} + +func (fl *FakeLogger) Log(keyvals ...interface{}) error { + return nil +} + +func TestNewApiWithoutSourceIPExtractor(t *testing.T) { + cfg := Config{} + serverCfg := server.Config{ + MetricsNamespace: "without_source_ip_extractor", + } + server, err := server.New(serverCfg) + require.NoError(t, err) + + api, err := New(cfg, serverCfg, server, &FakeLogger{}) + + require.NoError(t, err) + require.Nil(t, api.sourceIPs) +} + +func TestNewApiWithSourceIPExtractor(t *testing.T) { + cfg := Config{} + serverCfg := server.Config{ + LogSourceIPs: true, + MetricsNamespace: "with_source_ip_extractor", + } + server, err := server.New(serverCfg) + require.NoError(t, err) + + api, err := New(cfg, serverCfg, server, &FakeLogger{}) + + require.NoError(t, err) + require.NotNil(t, api.sourceIPs) +} + +func TestNewApiWithInvalidSourceIPExtractor(t *testing.T) { + cfg := Config{} + s := server.Server{ + HTTP: &mux.Router{}, + } + serverCfg := server.Config{ + LogSourceIPs: true, + LogSourceIPsHeader: "SomeHeader", + LogSourceIPsRegex: "[*", + MetricsNamespace: "with_invalid_source_ip_extractor", + } + + api, err := New(cfg, serverCfg, &s, &FakeLogger{}) + require.Error(t, err) + require.Nil(t, api) +} diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index fb25eb9370e..2ec992d1774 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -72,7 +72,7 @@ func (t *Cortex) initAPI() (services.Service, error) { t.Cfg.API.ServerPrefix = t.Cfg.Server.PathPrefix t.Cfg.API.LegacyHTTPPrefix = t.Cfg.HTTPPrefix - a, err := api.New(t.Cfg.API, t.Server, util.Logger) + a, err := api.New(t.Cfg.API, t.Cfg.Server, t.Server, util.Logger) if err != nil { return nil, err } diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index caa0c46605d..0f315479b3a 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -364,6 +364,7 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie if err != nil { return nil, err } + source := util.GetSourceIPsFromOutgoingCtx(ctx) var firstPartialErr error removeReplica := false @@ -538,6 +539,10 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie if sp := opentracing.SpanFromContext(ctx); sp != nil { localCtx = opentracing.ContextWithSpan(localCtx, sp) } + + // Get clientIP(s) from Context and add it to localCtx + localCtx = util.AddSourceIPsToOutgoingContext(localCtx, source) + return d.send(localCtx, ingester, timeseries, metadata, req.Source) }, func() { client.ReuseSlice(req.Timeseries) }) if err != nil { diff --git a/pkg/util/extract_forwarded.go b/pkg/util/extract_forwarded.go new file mode 100644 index 00000000000..79eca5723c4 --- /dev/null +++ b/pkg/util/extract_forwarded.go @@ -0,0 +1,53 @@ +package util + +import ( + "context" + + "google.golang.org/grpc/metadata" +) + +// ipAddressesKey is key for the GRPC metadata where the IP addresses are stored +const ipAddressesKey = "github.com/cortexproject/cortex/util/extract_forwarded/x-forwarded-for" + +// GetSourceIPsFromOutgoingCtx extracts the source field from the GRPC context +func GetSourceIPsFromOutgoingCtx(ctx context.Context) string { + md, ok := metadata.FromOutgoingContext(ctx) + if !ok { + return "" + } + ipAddresses, ok := md[ipAddressesKey] + if !ok { + return "" + } + return ipAddresses[0] +} + +// GetSourceIPsFromIncomingCtx extracts the source field from the GRPC context +func GetSourceIPsFromIncomingCtx(ctx context.Context) string { + md, ok := metadata.FromIncomingContext(ctx) + if !ok { + return "" + } + ipAddresses, ok := md[ipAddressesKey] + if !ok { + return "" + } + return ipAddresses[0] +} + +// AddSourceIPsToOutgoingContext adds the given source to the GRPC context +func AddSourceIPsToOutgoingContext(ctx context.Context, source string) context.Context { + if source != "" { + ctx = metadata.AppendToOutgoingContext(ctx, ipAddressesKey, source) + } + return ctx +} + +// AddSourceIPsToIncomingContext adds the given source to the GRPC context +func AddSourceIPsToIncomingContext(ctx context.Context, source string) context.Context { + if source != "" { + md := metadata.Pairs(ipAddressesKey, source) + ctx = metadata.NewIncomingContext(ctx, md) + } + return ctx +} diff --git a/pkg/util/extract_forwarded_test.go b/pkg/util/extract_forwarded_test.go new file mode 100644 index 00000000000..a8e19985c77 --- /dev/null +++ b/pkg/util/extract_forwarded_test.go @@ -0,0 +1,58 @@ +package util + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "google.golang.org/grpc/metadata" +) + +func TestGetSourceFromOutgoingCtx(t *testing.T) { + tests := []struct { + name string + key string + value string + want string + }{ + { + name: "No value in key", + key: ipAddressesKey, + value: "", + want: "", + }, + { + name: "Value in key", + key: ipAddressesKey, + value: "172.16.1.1", + want: "172.16.1.1", + }, + { + name: "Stored under wrong key", + key: "wrongkey", + value: "172.16.1.1", + want: "", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Test extracting from incoming context + ctx := context.Background() + if tt.value != "" { + md := metadata.Pairs(tt.key, tt.value) + ctx = metadata.NewIncomingContext(ctx, md) + } + got := GetSourceIPsFromIncomingCtx(ctx) + assert.Equal(t, tt.want, got) + + // Test extracting from outgoing context + ctx = context.Background() + if tt.value != "" { + md := metadata.Pairs(tt.key, tt.value) + ctx = metadata.NewOutgoingContext(ctx, md) + } + got = GetSourceIPsFromOutgoingCtx(ctx) + assert.Equal(t, tt.want, got) + }) + } +} diff --git a/pkg/util/log.go b/pkg/util/log.go index 38fe3367661..a34e02eea40 100644 --- a/pkg/util/log.go +++ b/pkg/util/log.go @@ -132,6 +132,12 @@ func WithTraceID(traceID string, l log.Logger) log.Logger { return log.With(l, "traceID", traceID) } +// WithSourceIPs returns a Logger that has information about the source IPs in +// its details. +func WithSourceIPs(sourceIPs string, l log.Logger) log.Logger { + return log.With(l, "sourceIPs", sourceIPs) +} + // CheckFatal prints an error and exits with error code 1 if err is non-nil func CheckFatal(location string, err error) { if err != nil { diff --git a/pkg/util/push/push.go b/pkg/util/push/push.go index f8eeec5d516..3fb7696cc34 100644 --- a/pkg/util/push/push.go +++ b/pkg/util/push/push.go @@ -6,6 +6,7 @@ import ( "github.com/go-kit/kit/log/level" "github.com/weaveworks/common/httpgrpc" + "github.com/weaveworks/common/middleware" "github.com/cortexproject/cortex/pkg/distributor" "github.com/cortexproject/cortex/pkg/ingester/client" @@ -13,12 +14,20 @@ import ( ) // Handler is a http.Handler which accepts WriteRequests. -func Handler(cfg distributor.Config, push func(context.Context, *client.WriteRequest) (*client.WriteResponse, error)) http.Handler { +func Handler(cfg distributor.Config, sourceIPs *middleware.SourceIPExtractor, push func(context.Context, *client.WriteRequest) (*client.WriteResponse, error)) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + logger := util.WithContext(ctx, util.Logger) + if sourceIPs != nil { + source := sourceIPs.Get(r) + if source != "" { + ctx = util.AddSourceIPsToOutgoingContext(ctx, source) + logger = util.WithSourceIPs(source, logger) + } + } compressionType := util.CompressionTypeFor(r.Header.Get("X-Prometheus-Remote-Write-Version")) var req client.PreallocWriteRequest - _, err := util.ParseProtoReader(r.Context(), r.Body, int(r.ContentLength), cfg.MaxRecvMsgSize, &req, compressionType) - logger := util.WithContext(r.Context(), util.Logger) + _, err := util.ParseProtoReader(ctx, r.Body, int(r.ContentLength), cfg.MaxRecvMsgSize, &req, compressionType) if err != nil { level.Error(logger).Log("err", err.Error()) http.Error(w, err.Error(), http.StatusBadRequest) @@ -28,7 +37,7 @@ func Handler(cfg distributor.Config, push func(context.Context, *client.WriteReq req.Source = client.API } - if _, err := push(r.Context(), &req.WriteRequest); err != nil { + if _, err := push(ctx, &req.WriteRequest); err != nil { resp, ok := httpgrpc.HTTPResponseFromError(err) if !ok { http.Error(w, err.Error(), http.StatusInternalServerError) diff --git a/pkg/util/push/push_test.go b/pkg/util/push/push_test.go index ab87d4f5dc3..a628d3b5077 100644 --- a/pkg/util/push/push_test.go +++ b/pkg/util/push/push_test.go @@ -12,6 +12,7 @@ import ( "github.com/prometheus/prometheus/prompb" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/weaveworks/common/middleware" "github.com/cortexproject/cortex/pkg/distributor" "github.com/cortexproject/cortex/pkg/ingester/client" @@ -20,7 +21,7 @@ import ( func TestHandler_remoteWrite(t *testing.T) { req := createRequest(t, createPrometheusRemoteWriteProtobuf(t)) resp := httptest.NewRecorder() - handler := Handler(distributor.Config{MaxRecvMsgSize: 100000}, verifyWriteRequestHandler(t, client.API)) + handler := Handler(distributor.Config{MaxRecvMsgSize: 100000}, nil, verifyWriteRequestHandler(t, client.API)) handler.ServeHTTP(resp, req) assert.Equal(t, 200, resp.Code) } @@ -28,7 +29,8 @@ func TestHandler_remoteWrite(t *testing.T) { func TestHandler_cortexWriteRequest(t *testing.T) { req := createRequest(t, createCortexWriteRequestProtobuf(t)) resp := httptest.NewRecorder() - handler := Handler(distributor.Config{MaxRecvMsgSize: 100000}, verifyWriteRequestHandler(t, client.RULE)) + sourceIPs, _ := middleware.NewSourceIPs("SomeField", "(.*)") + handler := Handler(distributor.Config{MaxRecvMsgSize: 100000}, sourceIPs, verifyWriteRequestHandler(t, client.RULE)) handler.ServeHTTP(resp, req) assert.Equal(t, 200, resp.Code) }