diff --git a/CHANGELOG.md b/CHANGELOG.md index 7e347e0b971..c8faccd7902 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -33,6 +33,7 @@ * [ENHANCEMENT] Store Gateway: Add new metrics `cortex_bucket_store_sent_chunk_size_bytes`, `cortex_bucket_store_postings_size_bytes` and `cortex_bucket_store_empty_postings_total`. #5397 * [ENHANCEMENT] Add jitter to lifecycler heartbeat. #5404 * [ENHANCEMENT] Store Gateway: Add config `estimated_max_series_size_bytes` and `estimated_max_chunk_size_bytes` to address data overfetch. #5401 +* [ENHANCEMENT] Distributor/Ingester: Add experimental `-distributor.sign_write_requests` flag to sign the write requests. #5430 * [BUGFIX] Ruler: Validate if rule group can be safely converted back to rule group yaml from protobuf message #5265 * [BUGFIX] Querier: Convert gRPC `ResourceExhausted` status code from store gateway to 422 limit error. #5286 * [BUGFIX] Alertmanager: Route web-ui requests to the alertmanager distributor when sharding is enabled. #5293 diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index b2ea9f086c1..efdc1fc6bf0 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -2191,6 +2191,11 @@ ha_tracker: # CLI flag: -distributor.extend-writes [extend_writes: | default = true] +# EXPERIMENTAL: If enabled, sign the write request between distributors and +# ingesters. +# CLI flag: -distributor.sign-write-requests +[sign_write_requests: | default = false] + ring: kvstore: # Backend storage to use for the ring. Supported values are: consul, etcd, diff --git a/go.mod b/go.mod index 330b9e3bf0e..2249da6d847 100644 --- a/go.mod +++ b/go.mod @@ -76,7 +76,10 @@ require ( sigs.k8s.io/yaml v1.3.0 ) -require github.com/google/go-cmp v0.5.9 +require ( + github.com/cespare/xxhash/v2 v2.2.0 + github.com/google/go-cmp v0.5.9 +) require ( cloud.google.com/go v0.110.0 // indirect @@ -109,7 +112,6 @@ require ( github.com/beorn7/perks v1.0.1 // indirect github.com/blang/semver/v4 v4.0.0 // indirect github.com/cenkalti/backoff/v4 v4.2.1 // indirect - github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/coreos/go-semver v0.3.0 // indirect github.com/coreos/go-systemd/v22 v22.5.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect diff --git a/pkg/cortex/cortex.go b/pkg/cortex/cortex.go index d6bf465c9d9..83496d35655 100644 --- a/pkg/cortex/cortex.go +++ b/pkg/cortex/cortex.go @@ -21,6 +21,8 @@ import ( "google.golang.org/grpc/health/grpc_health_v1" "gopkg.in/yaml.v2" + "github.com/cortexproject/cortex/pkg/util/grpcclient" + "github.com/cortexproject/cortex/pkg/alertmanager" "github.com/cortexproject/cortex/pkg/alertmanager/alertstore" "github.com/cortexproject/cortex/pkg/api" @@ -355,6 +357,7 @@ func New(cfg Config) (*Cortex, error) { cortex.setupThanosTracing() cortex.setupGRPCHeaderForwarding() + cortex.setupRequestSigning() if err := cortex.setupModuleManager(); err != nil { return nil, err @@ -379,6 +382,13 @@ func (t *Cortex) setupGRPCHeaderForwarding() { } } +func (t *Cortex) setupRequestSigning() { + if t.Cfg.Distributor.SignWriteRequestsEnabled { + util_log.WarnExperimentalUse("Distributor SignWriteRequestsEnabled") + t.Cfg.Server.GRPCMiddleware = append(t.Cfg.Server.GRPCMiddleware, grpcclient.UnarySigningServerInterceptor) + } +} + // Run starts Cortex running, and blocks until a Cortex stops. func (t *Cortex) Run() error { // Register custom process metrics. diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index a72fda1c59b..f72b182489e 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -203,6 +203,7 @@ func (t *Cortex) initOverridesExporter() (services.Service, error) { func (t *Cortex) initDistributorService() (serv services.Service, err error) { t.Cfg.Distributor.DistributorRing.ListenPort = t.Cfg.Server.GRPCListenPort t.Cfg.Distributor.ShuffleShardingLookbackPeriod = t.Cfg.Querier.ShuffleShardingIngestersLookbackPeriod + t.Cfg.IngesterClient.GRPCClientConfig.SignWriteRequestsEnabled = t.Cfg.Distributor.SignWriteRequestsEnabled // Check whether the distributor can join the distributors ring, which is // whenever it's not running as an internal dependency (ie. querier or diff --git a/pkg/cortexpb/extensions.go b/pkg/cortexpb/extensions.go new file mode 100644 index 00000000000..f900b775918 --- /dev/null +++ b/pkg/cortexpb/extensions.go @@ -0,0 +1,107 @@ +package cortexpb + +import ( + "context" + "fmt" + "strconv" + "sync" + + "github.com/cespare/xxhash/v2" + + "github.com/cortexproject/cortex/pkg/tenant" +) + +const maxBufferSize = 1024 +const signVersion = "v1" + +var signerPool = sync.Pool{ + New: func() interface{} { + return newSigner() + }, +} + +type signer struct { + h *xxhash.Digest + b []byte + optimized bool +} + +func newSigner() *signer { + s := &signer{ + h: xxhash.New(), + b: make([]byte, 0, maxBufferSize), + } + s.Reset() + return s +} + +func (s *signer) Reset() { + s.h.Reset() + s.b = s.b[:0] + s.optimized = true +} + +func (s *signer) WriteString(val string) { + switch { + case !s.optimized: + _, _ = s.h.WriteString(val) + case len(s.b)+len(val) > cap(s.b): + // If labels val does not fit in the []byte we fall back to not allocate the whole entry. + _, _ = s.h.Write(s.b) + _, _ = s.h.WriteString(val) + s.optimized = false + default: + // Use xxhash.Sum64(b) for fast path as it's faster. + s.b = append(s.b, val...) + } +} + +func (s *signer) Sum64() uint64 { + if s.optimized { + return xxhash.Sum64(s.b) + } + + return s.h.Sum64() +} + +func (w *WriteRequest) VerifySign(ctx context.Context, signature string) (bool, error) { + s, err := w.Sign(ctx) + return s == signature, err +} + +func (w *WriteRequest) Sign(ctx context.Context) (string, error) { + u, err := tenant.TenantID(ctx) + if err != nil { + return "", err + } + + s := signerPool.Get().(*signer) + defer func() { + s.Reset() + signerPool.Put(s) + }() + s.WriteString(u) + + for _, md := range w.Metadata { + s.WriteString(strconv.Itoa(int(md.Type))) + s.WriteString(md.MetricFamilyName) + s.WriteString(md.Help) + s.WriteString(md.Unit) + } + + for _, ts := range w.Timeseries { + for _, lbl := range ts.Labels { + s.WriteString(lbl.Name) + s.WriteString(lbl.Value) + } + + for _, ex := range ts.Exemplars { + for _, lbl := range ex.Labels { + s.WriteString(lbl.Name) + s.WriteString(lbl.Value) + } + } + } + + return fmt.Sprintf("%v/%v", signVersion, s.Sum64()), nil +} diff --git a/pkg/cortexpb/extensions_test.go b/pkg/cortexpb/extensions_test.go new file mode 100644 index 00000000000..0f8b6385264 --- /dev/null +++ b/pkg/cortexpb/extensions_test.go @@ -0,0 +1,100 @@ +package cortexpb + +import ( + "context" + "fmt" + "sync" + "testing" + + "github.com/stretchr/testify/require" + "github.com/weaveworks/common/user" +) + +func TestWriteRequest_Sign(t *testing.T) { + ctx := context.Background() + ctx = user.InjectOrgID(ctx, "user-1") + + tests := map[string]struct { + w *WriteRequest + expectedSign string + }{ + "small write with exemplar": { + w: createWriteRequest(10, true, "family1", "help1", "unit"), + expectedSign: "v1/9125893422459502203", + }, + "small write with exemplar and changed md": { + w: createWriteRequest(10, true, "family2", "help1", "unit"), + expectedSign: "v1/18044786562323437562", + }, + "small write without exemplar": { + w: createWriteRequest(10, false, "family1", "help1", "unit"), + expectedSign: "v1/7697478040597284323", + }, + "big write with exemplar": { + w: createWriteRequest(10000, true, "family1", "help1", "unit"), + expectedSign: "v1/18402783317092766507", + }, + "big write without exemplar": { + w: createWriteRequest(10000, false, "family1", "help1", "unit"), + expectedSign: "v1/14973071954515615892", + }, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + // running multiple times in parallel to make sure no race + itNumber := 1000 + wg := sync.WaitGroup{} + wg.Add(itNumber) + for i := 0; i < itNumber; i++ { + go func() { + defer wg.Done() + s, err := tc.w.Sign(ctx) + require.NoError(t, err) + // Make sure this sign doesn't change + require.Equal(t, tc.expectedSign, s) + }() + } + wg.Wait() + }) + } +} + +func createWriteRequest(numTs int, exemplar bool, family string, help string, unit string) *WriteRequest { + w := &WriteRequest{} + w.Metadata = []*MetricMetadata{ + { + MetricFamilyName: family, + Help: help, + Unit: unit, + }, + } + + for i := 0; i < numTs; i++ { + w.Timeseries = append(w.Timeseries, PreallocTimeseries{ + TimeSeries: &TimeSeries{ + Labels: []LabelAdapter{ + { + Name: fmt.Sprintf("Name-%v", i), + Value: fmt.Sprintf("Value-%v", i), + }, + }, + }, + }) + + if exemplar { + w.Timeseries[i].Exemplars = []Exemplar{ + { + Labels: []LabelAdapter{ + { + Name: fmt.Sprintf("Ex-Name-%v", i), + Value: fmt.Sprintf("Ex-Value-%v", i), + }, + }, + }, + } + } + } + + return w +} diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index bdd10333764..7ce4179e54a 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -127,9 +127,10 @@ type Config struct { RemoteTimeout time.Duration `yaml:"remote_timeout"` ExtraQueryDelay time.Duration `yaml:"extra_queue_delay"` - ShardingStrategy string `yaml:"sharding_strategy"` - ShardByAllLabels bool `yaml:"shard_by_all_labels"` - ExtendWrites bool `yaml:"extend_writes"` + ShardingStrategy string `yaml:"sharding_strategy"` + ShardByAllLabels bool `yaml:"shard_by_all_labels"` + ExtendWrites bool `yaml:"extend_writes"` + SignWriteRequestsEnabled bool `yaml:"sign_write_requests"` // Distributors ring DistributorRing RingConfig `yaml:"ring"` @@ -163,6 +164,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.RemoteTimeout, "distributor.remote-timeout", 2*time.Second, "Timeout for downstream ingesters.") f.DurationVar(&cfg.ExtraQueryDelay, "distributor.extra-query-delay", 0, "Time to wait before sending more than the minimum successful query requests.") f.BoolVar(&cfg.ShardByAllLabels, "distributor.shard-by-all-labels", false, "Distribute samples based on all labels, as opposed to solely by user and metric name.") + f.BoolVar(&cfg.SignWriteRequestsEnabled, "distributor.sign-write-requests", false, "EXPERIMENTAL: If enabled, sign the write request between distributors and ingesters.") f.StringVar(&cfg.ShardingStrategy, "distributor.sharding-strategy", util.ShardingStrategyDefault, fmt.Sprintf("The sharding strategy to use. Supported values are: %s.", strings.Join(supportedShardingStrategies, ", "))) f.BoolVar(&cfg.ExtendWrites, "distributor.extend-writes", true, "Try writing to an additional ingester in the presence of an ingester not in the ACTIVE state. It is useful to disable this along with -ingester.unregister-on-shutdown=false in order to not spread samples to extra ingesters during rolling restarts with consistent naming.") @@ -181,6 +183,7 @@ func (cfg *Config) Validate(limits validation.Limits) error { } haHATrackerConfig := cfg.HATrackerConfig.ToHATrackerConfig() + return haHATrackerConfig.Validate() } diff --git a/pkg/util/grpcclient/grpcclient.go b/pkg/util/grpcclient/grpcclient.go index cfec645c938..61f7bfce281 100644 --- a/pkg/util/grpcclient/grpcclient.go +++ b/pkg/util/grpcclient/grpcclient.go @@ -29,8 +29,9 @@ type Config struct { BackoffOnRatelimits bool `yaml:"backoff_on_ratelimits"` BackoffConfig backoff.Config `yaml:"backoff_config"` - TLSEnabled bool `yaml:"tls_enabled"` - TLS tls.ClientConfig `yaml:",inline"` + TLSEnabled bool `yaml:"tls_enabled"` + TLS tls.ClientConfig `yaml:",inline"` + SignWriteRequestsEnabled bool `yaml:"-"` } // RegisterFlags registers flags. @@ -91,6 +92,10 @@ func (cfg *Config) DialOption(unaryClientInterceptors []grpc.UnaryClientIntercep unaryClientInterceptors = append([]grpc.UnaryClientInterceptor{NewRateLimiter(cfg)}, unaryClientInterceptors...) } + if cfg.SignWriteRequestsEnabled { + unaryClientInterceptors = append(unaryClientInterceptors, UnarySigningClientInterceptor) + } + return append( opts, grpc.WithDefaultCallOptions(cfg.CallOptions()...), diff --git a/pkg/util/grpcclient/signing_handler.go b/pkg/util/grpcclient/signing_handler.go new file mode 100644 index 00000000000..d5b7803f289 --- /dev/null +++ b/pkg/util/grpcclient/signing_handler.go @@ -0,0 +1,96 @@ +package grpcclient + +import ( + "context" + + "github.com/weaveworks/common/errors" + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" +) + +var ( + reqSignHeaderName = "x-req-signature" +) + +const ( + ErrDifferentSignaturePresent = errors.Error("different signature already present") + ErrMultipleSignaturePresent = errors.Error("multiples signature present") + ErrSignatureNotPresent = errors.Error("signature not present") + ErrSignatureMismatch = errors.Error("signature mismatch") +) + +// SignRequest define the interface that must be implemented by the request structs to be signed +type SignRequest interface { + // Sign returns the signature for the given request + Sign(context.Context) (string, error) + // VerifySign returns true if the signature is valid + VerifySign(context.Context, string) (bool, error) +} + +func UnarySigningServerInterceptor(ctx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { + rs, ok := req.(SignRequest) + if !ok { + return handler(ctx, req) + } + + md, ok := metadata.FromIncomingContext(ctx) + + if !ok { + return nil, ErrSignatureNotPresent + } + + sig, ok := md[reqSignHeaderName] + + if !ok || len(sig) != 1 { + return nil, ErrSignatureNotPresent + } + + valid, err := rs.VerifySign(ctx, sig[0]) + + if err != nil { + return nil, err + } + + if !valid { + return nil, ErrSignatureMismatch + } + + return handler(ctx, req) +} + +func UnarySigningClientInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { + rs, ok := req.(SignRequest) + + if !ok { + return invoker(ctx, method, req, reply, cc, opts...) + } + + signature, err := rs.Sign(ctx) + + if err != nil { + return err + } + + md, ok := metadata.FromOutgoingContext(ctx) + if !ok { + md = metadata.New(map[string]string{}) + } + + newCtx := ctx + + if s, ok := md[reqSignHeaderName]; ok { + if len(s) == 1 { + if s[0] != signature { + return ErrDifferentSignaturePresent + } + } else { + return ErrMultipleSignaturePresent + } + } else { + md = md.Copy() + md[reqSignHeaderName] = []string{signature} + newCtx = metadata.NewOutgoingContext(ctx, md) + } + + return invoker(newCtx, method, req, reply, cc, opts...) +} diff --git a/pkg/util/grpcclient/signing_handler_test.go b/pkg/util/grpcclient/signing_handler_test.go new file mode 100644 index 00000000000..4682b34a45f --- /dev/null +++ b/pkg/util/grpcclient/signing_handler_test.go @@ -0,0 +1,67 @@ +package grpcclient + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + "github.com/weaveworks/common/user" + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" + + "github.com/cortexproject/cortex/pkg/cortexpb" +) + +func TestUnarySigningHandler(t *testing.T) { + ctx := context.Background() + ctx = user.InjectOrgID(ctx, "user-1") + w := &cortexpb.WriteRequest{} + + // Sign Request + err := UnarySigningClientInterceptor(ctx, "", w, w, nil, func(c context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, opts ...grpc.CallOption) error { + ctx = c + return nil + }) + + require.NoError(t, err) + + // Verify the outgoing context + md, ok := metadata.FromOutgoingContext(ctx) + signature, err := w.Sign(ctx) + require.NoError(t, err) + require.True(t, ok) + require.Equal(t, md[reqSignHeaderName][0], signature) + ctx = metadata.NewIncomingContext(ctx, md) + + // Verify signature on the server side + _, err = UnarySigningServerInterceptor(ctx, w, nil, func(ctx context.Context, req interface{}) (interface{}, error) { + return nil, nil + }) + require.NoError(t, err) + + // Change user id and make sure the request signature mismatch + ctx = user.InjectOrgID(ctx, "user-2") + _, err = UnarySigningServerInterceptor(ctx, w, nil, func(ctx context.Context, req interface{}) (interface{}, error) { + return nil, nil + }) + + require.ErrorIs(t, err, ErrSignatureMismatch) + + // Return error when signature is not present + ctx = user.InjectOrgID(context.Background(), "user-") + + _, err = UnarySigningServerInterceptor(ctx, w, nil, func(ctx context.Context, req interface{}) (interface{}, error) { + return nil, nil + }) + + require.ErrorIs(t, err, ErrSignatureNotPresent) + + // Return error when multiples signatures are present + md[reqSignHeaderName] = append(md[reqSignHeaderName], "sig1", "sig2") + ctx = metadata.NewOutgoingContext(ctx, md) + err = UnarySigningClientInterceptor(ctx, "", w, w, nil, func(c context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, opts ...grpc.CallOption) error { + ctx = c + return nil + }) + require.ErrorIs(t, err, ErrMultipleSignaturePresent) +}