From e982edf1e164d0c3859be99cc1732be1e55f2d88 Mon Sep 17 00:00:00 2001 From: Ed Snible Date: Tue, 31 May 2022 09:33:19 -0400 Subject: [PATCH] Flow tenant from GRPC PostSpans header through processors Signed-off-by: Ed Snible --- cmd/collector/app/flags/flags.go | 10 + cmd/collector/app/flags/flags_test.go | 36 +++ cmd/collector/app/handler/grpc_handler.go | 41 ++- .../app/handler/grpc_handler_test.go | 244 +++++++++++++++--- cmd/collector/app/handler/otlp_receiver.go | 2 +- cmd/collector/app/server/grpc_test.go | 11 +- cmd/collector/app/span_handler_builder.go | 3 +- pkg/config/tenancy/flags.go | 53 ++++ pkg/config/tenancy/flags_test.go | 101 ++++++++ pkg/config/tenancy/tenancy.go | 86 ++++++ pkg/config/tenancy/tenancy_test.go | 91 +++++++ 11 files changed, 627 insertions(+), 51 deletions(-) create mode 100644 pkg/config/tenancy/flags.go create mode 100644 pkg/config/tenancy/flags_test.go create mode 100644 pkg/config/tenancy/tenancy.go create mode 100644 pkg/config/tenancy/tenancy_test.go diff --git a/cmd/collector/app/flags/flags.go b/cmd/collector/app/flags/flags.go index 77641d64871..f081b62fb12 100644 --- a/cmd/collector/app/flags/flags.go +++ b/cmd/collector/app/flags/flags.go @@ -24,6 +24,7 @@ import ( "go.uber.org/zap" "github.com/jaegertracing/jaeger/cmd/flags" + "github.com/jaegertracing/jaeger/pkg/config/tenancy" "github.com/jaegertracing/jaeger/pkg/config/tlscfg" "github.com/jaegertracing/jaeger/ports" ) @@ -152,6 +153,8 @@ type GRPCOptions struct { // MaxConnectionAgeGrace is an additive period after MaxConnectionAge after which the connection will be forcibly closed. // See gRPC's keepalive.ServerParameters#MaxConnectionAgeGrace. MaxConnectionAgeGrace time.Duration + // Tenancy configures tenancy for endpoints that collect spans + Tenancy tenancy.Options } // AddFlags adds flags for CollectorOptions @@ -172,6 +175,8 @@ func AddFlags(flags *flag.FlagSet) { flags.String(flagZipkinAllowedOrigins, "*", "Comma separated list of allowed origins for the Zipkin collector service, default accepts all") flags.String(flagZipkinHTTPHostPort, "", "The host:port (e.g. 127.0.0.1:9411 or :9411) of the collector's Zipkin server (disabled by default)") tlsZipkinFlagsConfig.AddFlags(flags) + + tenancy.AddFlags(flags) } func addHTTPFlags(flags *flag.FlagSet, cfg serverFlagsConfig, defaultHostPort string) { @@ -219,6 +224,11 @@ func (opts *GRPCOptions) initFromViper(v *viper.Viper, logger *zap.Logger, cfg s } else { return fmt.Errorf("failed to parse gRPC TLS options: %w", err) } + if tenancy, err := tenancy.InitFromViper(v); err == nil { + opts.Tenancy = tenancy + } else { + return fmt.Errorf("failed to parse Tenancy options: %w", err) + } return nil } diff --git a/cmd/collector/app/flags/flags_test.go b/cmd/collector/app/flags/flags_test.go index c9299d26bcb..194f600dc79 100644 --- a/cmd/collector/app/flags/flags_test.go +++ b/cmd/collector/app/flags/flags_test.go @@ -106,3 +106,39 @@ func TestCollectorOptionsWithFlags_CheckMaxConnectionAge(t *testing.T) { assert.Equal(t, 5*time.Minute, c.GRPC.MaxConnectionAge) assert.Equal(t, time.Minute, c.GRPC.MaxConnectionAgeGrace) } + +func TestCollectorOptionsWithFlags_CheckNoTenancy(t *testing.T) { + c := &CollectorOptions{} + v, command := config.Viperize(AddFlags) + command.ParseFlags([]string{}) + c.InitFromViper(v, zap.NewNop()) + + assert.Equal(t, false, c.GRPC.Tenancy.Enabled) +} + +func TestCollectorOptionsWithFlags_CheckSimpleTenancy(t *testing.T) { + c := &CollectorOptions{} + v, command := config.Viperize(AddFlags) + command.ParseFlags([]string{ + "--multi_tenancy.enabled=true", + }) + c.InitFromViper(v, zap.NewNop()) + + assert.Equal(t, true, c.GRPC.Tenancy.Enabled) + assert.Equal(t, "x-tenant", c.GRPC.Tenancy.Header) +} + +func TestCollectorOptionsWithFlags_CheckFullTenancy(t *testing.T) { + c := &CollectorOptions{} + v, command := config.Viperize(AddFlags) + command.ParseFlags([]string{ + "--multi_tenancy.enabled=true", + "--multi_tenancy.header=custom-tenant-header", + "--multi_tenancy.tenants=acme,hardware-store", + }) + c.InitFromViper(v, zap.NewNop()) + + assert.Equal(t, true, c.GRPC.Tenancy.Enabled) + assert.Equal(t, "custom-tenant-header", c.GRPC.Tenancy.Header) + assert.Equal(t, []string{"acme", "hardware-store"}, c.GRPC.Tenancy.Tenants) +} diff --git a/cmd/collector/app/handler/grpc_handler.go b/cmd/collector/app/handler/grpc_handler.go index 51d35920b19..095bb679a19 100644 --- a/cmd/collector/app/handler/grpc_handler.go +++ b/cmd/collector/app/handler/grpc_handler.go @@ -20,10 +20,12 @@ import ( "go.uber.org/zap" "google.golang.org/grpc/codes" _ "google.golang.org/grpc/encoding/gzip" // register zip encoding + "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" "github.com/jaegertracing/jaeger/cmd/collector/app/processor" "github.com/jaegertracing/jaeger/model" + "github.com/jaegertracing/jaeger/pkg/config/tenancy" "github.com/jaegertracing/jaeger/proto-gen/api_v2" ) @@ -31,10 +33,11 @@ import ( type GRPCHandler struct { logger *zap.Logger batchConsumer batchConsumer + tenancyConfig *tenancy.TenancyConfig } // NewGRPCHandler registers routes for this handler on the given router. -func NewGRPCHandler(logger *zap.Logger, spanProcessor processor.SpanProcessor) *GRPCHandler { +func NewGRPCHandler(logger *zap.Logger, spanProcessor processor.SpanProcessor, tenancyConfig *tenancy.TenancyConfig) *GRPCHandler { return &GRPCHandler{ logger: logger, batchConsumer: batchConsumer{ @@ -45,13 +48,20 @@ func NewGRPCHandler(logger *zap.Logger, spanProcessor processor.SpanProcessor) * SpanFormat: processor.ProtoSpanFormat, }, }, + tenancyConfig: tenancyConfig, } } // PostSpans implements gRPC CollectorService. func (g *GRPCHandler) PostSpans(ctx context.Context, r *api_v2.PostSpansRequest) (*api_v2.PostSpansResponse, error) { + tenant, err := g.validateTenant(ctx) + if err != nil { + g.logger.Error("rejecting spans (tenancy)", zap.Error(err)) + return nil, err + } + batch := &r.Batch - err := g.batchConsumer.consume(batch) + err = g.batchConsumer.consume(batch, tenant) return &api_v2.PostSpansResponse{}, err } @@ -61,7 +71,7 @@ type batchConsumer struct { spanOptions processor.SpansOptions } -func (c *batchConsumer) consume(batch *model.Batch) error { +func (c *batchConsumer) consume(batch *model.Batch, tenant string) error { for _, span := range batch.Spans { if span.GetProcess() == nil { span.Process = batch.Process @@ -70,6 +80,7 @@ func (c *batchConsumer) consume(batch *model.Batch) error { _, err := c.spanProcessor.ProcessSpans(batch.Spans, processor.SpansOptions{ InboundTransport: processor.GRPCTransport, SpanFormat: processor.ProtoSpanFormat, + Tenant: tenant, }) if err != nil { if err == processor.ErrBusy { @@ -80,3 +91,27 @@ func (c *batchConsumer) consume(batch *model.Batch) error { } return nil } + +func (g *GRPCHandler) validateTenant(ctx context.Context) (string, error) { + if !g.tenancyConfig.Enabled { + return "", nil + } + + md, ok := metadata.FromIncomingContext(ctx) + if !ok { + return "", status.Errorf(codes.PermissionDenied, "missing tenant header") + } + + tenants := md[g.tenancyConfig.Header] + if len(tenants) < 1 { + return "", status.Errorf(codes.PermissionDenied, "missing tenant header") + } else if len(tenants) > 1 { + return "", status.Errorf(codes.PermissionDenied, "extra tenant header") + } + + if !g.tenancyConfig.Valid(tenants[0]) { + return "", status.Errorf(codes.PermissionDenied, "unknown tenant") + } + + return tenants[0], nil +} diff --git a/cmd/collector/app/handler/grpc_handler_test.go b/cmd/collector/app/handler/grpc_handler_test.go index a38b7bc41bf..98a935063ed 100644 --- a/cmd/collector/app/handler/grpc_handler_test.go +++ b/cmd/collector/app/handler/grpc_handler_test.go @@ -26,10 +26,11 @@ import ( "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/metadata" "github.com/jaegertracing/jaeger/cmd/collector/app/processor" "github.com/jaegertracing/jaeger/model" - "github.com/jaegertracing/jaeger/pkg/testutils" + "github.com/jaegertracing/jaeger/pkg/config/tenancy" "github.com/jaegertracing/jaeger/proto-gen/api_v2" ) @@ -37,6 +38,7 @@ type mockSpanProcessor struct { expectedError error mux sync.Mutex spans []*model.Span + tenants map[string]bool } func (p *mockSpanProcessor) ProcessSpans(spans []*model.Span, opts processor.SpansOptions) ([]bool, error) { @@ -44,6 +46,12 @@ func (p *mockSpanProcessor) ProcessSpans(spans []*model.Span, opts processor.Spa defer p.mux.Unlock() p.spans = append(p.spans, spans...) oks := make([]bool, len(spans)) + + if p.tenants == nil { + p.tenants = make(map[string]bool) + } + p.tenants[opts.Tenant] = true + return oks, p.expectedError } @@ -53,10 +61,17 @@ func (p *mockSpanProcessor) getSpans() []*model.Span { return p.spans } +func (p *mockSpanProcessor) getTenants() map[string]bool { + p.mux.Lock() + defer p.mux.Unlock() + return p.tenants +} + func (p *mockSpanProcessor) reset() { p.mux.Lock() defer p.mux.Unlock() p.spans = nil + p.tenants = nil } func (p *mockSpanProcessor) Close() error { @@ -84,7 +99,7 @@ func newClient(t *testing.T, addr net.Addr) (api_v2.CollectorServiceClient, *grp func TestPostSpans(t *testing.T) { processor := &mockSpanProcessor{} server, addr := initializeGRPCTestServer(t, func(s *grpc.Server) { - handler := NewGRPCHandler(zap.NewNop(), processor) + handler := NewGRPCHandler(zap.NewNop(), processor, &tenancy.TenancyConfig{}) api_v2.RegisterCollectorServiceServer(s, handler) }) defer server.Stop() @@ -115,7 +130,7 @@ func TestPostSpans(t *testing.T) { func TestGRPCCompressionEnabled(t *testing.T) { processor := &mockSpanProcessor{} server, addr := initializeGRPCTestServer(t, func(s *grpc.Server) { - handler := NewGRPCHandler(zap.NewNop(), processor) + handler := NewGRPCHandler(zap.NewNop(), processor, &tenancy.TenancyConfig{}) api_v2.RegisterCollectorServiceServer(s, handler) }) defer server.Stop() @@ -124,55 +139,202 @@ func TestGRPCCompressionEnabled(t *testing.T) { defer conn.Close() // Do not use string constant imported from grpc, since we are actually testing that package is imported by the handler. - _, err := client.PostSpans( - context.Background(), - &api_v2.PostSpansRequest{}, - grpc.UseCompressor("gzip"), - ) + _, err := client.PostSpans(context.Background(), &api_v2.PostSpansRequest{}, + grpc.UseCompressor("gzip")) require.NoError(t, err) } func TestPostSpansWithError(t *testing.T) { - testCases := []struct { - processorError error - expectedError string - expectedLog string + expectedError := errors.New("test-error") + processor := &mockSpanProcessor{expectedError: expectedError} + server, addr := initializeGRPCTestServer(t, func(s *grpc.Server) { + handler := NewGRPCHandler(zap.NewNop(), processor, &tenancy.TenancyConfig{}) + api_v2.RegisterCollectorServiceServer(s, handler) + }) + defer server.Stop() + client, conn := newClient(t, addr) + defer conn.Close() + r, err := client.PostSpans(context.Background(), &api_v2.PostSpansRequest{ + Batch: model.Batch{ + Spans: []*model.Span{ + { + OperationName: "fake-operation", + }, + }, + }, + }) + require.Error(t, err) + require.Nil(t, r) + require.Contains(t, err.Error(), expectedError.Error()) + require.Len(t, processor.getSpans(), 1) +} + +// withMetadata returns a Context with metadata for outbound (client) calls +func withMetadata(ctx context.Context, headerName, headerValue string, t *testing.T) context.Context { + t.Helper() + + md := metadata.New(map[string]string{headerName: headerValue}) + return metadata.NewOutgoingContext(ctx, md) +} + +func TestPostTenantedSpans(t *testing.T) { + tenantHeader := "x-tenant" + dummyTenant := "grpc-test-tenant" + + processor := &mockSpanProcessor{} + server, addr := initializeGRPCTestServer(t, func(s *grpc.Server) { + handler := NewGRPCHandler(zap.NewNop(), processor, + tenancy.NewTenancyConfig(&tenancy.Options{ + Enabled: true, + Header: tenantHeader, + Tenants: []string{dummyTenant}, + })) + api_v2.RegisterCollectorServiceServer(s, handler) + }) + defer server.Stop() + client, conn := newClient(t, addr) + defer conn.Close() + + ctxWithTenant := withMetadata(context.Background(), tenantHeader, dummyTenant, t) + ctxNoTenant := context.Background() + mdTwoTenants := metadata.Pairs() + mdTwoTenants.Set(tenantHeader, "a", "b") + ctxTwoTenants := metadata.NewOutgoingContext(context.Background(), mdTwoTenants) + ctxBadTenant := withMetadata(context.Background(), tenantHeader, "invalid-tenant", t) + + withMetadata(context.Background(), + tenantHeader, dummyTenant, t) + + tests := []struct { + name string + ctx context.Context + batch model.Batch + mustFail bool + expected []*model.Span + expectedTenants map[string]bool }{ { - processorError: errors.New("test-error"), - expectedError: "test-error", - expectedLog: "test-error", + name: "valid tenant", + ctx: ctxWithTenant, + batch: model.Batch{Process: &model.Process{ServiceName: "batch-process"}, Spans: []*model.Span{{OperationName: "test-op", Process: &model.Process{ServiceName: "bar"}}}}, + + mustFail: false, + expected: []*model.Span{{OperationName: "test-op", Process: &model.Process{ServiceName: "bar"}}}, + expectedTenants: map[string]bool{dummyTenant: true}, + }, + { + name: "no tenant", + ctx: ctxNoTenant, + batch: model.Batch{Process: &model.Process{ServiceName: "batch-process"}, Spans: []*model.Span{{OperationName: "test-op", Process: &model.Process{ServiceName: "bar"}}}}, + + // Because NewGRPCHandler expects a tenant header, it will reject spans without one + mustFail: true, + expected: nil, + expectedTenants: nil, }, { - processorError: processor.ErrBusy, - expectedError: "server busy", + name: "two tenants", + ctx: ctxTwoTenants, + batch: model.Batch{Process: &model.Process{ServiceName: "batch-process"}, Spans: []*model.Span{{OperationName: "test-op", Process: &model.Process{ServiceName: "bar"}}}}, + + // NewGRPCHandler rejects spans with multiple values for tenant header + mustFail: true, + expected: nil, + expectedTenants: nil, + }, + { + name: "invalid tenant", + ctx: ctxBadTenant, + batch: model.Batch{Process: &model.Process{ServiceName: "batch-process"}, Spans: []*model.Span{{OperationName: "test-op", Process: &model.Process{ServiceName: "bar"}}}}, + + // NewGRPCHandler rejects spans with multiple values for tenant header + mustFail: true, + expected: nil, + expectedTenants: nil, }, } - for _, test := range testCases { - t.Run(test.expectedError, func(t *testing.T) { - processor := &mockSpanProcessor{expectedError: test.processorError} - logger, logBuf := testutils.NewLogger() - server, addr := initializeGRPCTestServer(t, func(s *grpc.Server) { - handler := NewGRPCHandler(logger, processor) - api_v2.RegisterCollectorServiceServer(s, handler) - }) - defer server.Stop() - client, conn := newClient(t, addr) - defer conn.Close() - r, err := client.PostSpans(context.Background(), &api_v2.PostSpansRequest{ - Batch: model.Batch{ - Spans: []*model.Span{ - { - OperationName: "fake-operation", - }, - }, - }, + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + + _, err := client.PostSpans(test.ctx, &api_v2.PostSpansRequest{ + Batch: test.batch, }) - require.Error(t, err) - require.Nil(t, r) - assert.Contains(t, err.Error(), test.expectedError) - assert.Contains(t, logBuf.String(), test.expectedLog) - assert.Len(t, processor.getSpans(), 1) + if test.mustFail { + require.Error(t, err) + } else { + require.NoError(t, err) + } + assert.Equal(t, test.expected, processor.getSpans()) + assert.Equal(t, test.expectedTenants, processor.getTenants()) + processor.reset() + }) + } +} + +// withIncomingMetadata returns a Context with metadata for a server to receive +func withIncomingMetadata(ctx context.Context, headerName, headerValue string, t *testing.T) context.Context { + t.Helper() + + md := metadata.New(map[string]string{headerName: headerValue}) + return metadata.NewIncomingContext(ctx, md) +} + +func TestGetTenant(t *testing.T) { + tenantHeader := "some-tenant-header" + validTenants := []string{"acme", "another-example"} + + mdTwoTenants := metadata.Pairs() + mdTwoTenants.Set(tenantHeader, "a", "b") + ctxTwoTenants := metadata.NewOutgoingContext(context.Background(), mdTwoTenants) + + tests := []struct { + name string + ctx context.Context + tenant string + mustFail bool + }{ + { + name: "valid tenant", + ctx: withIncomingMetadata(context.TODO(), tenantHeader, "acme", t), + mustFail: false, + tenant: "acme", + }, + { + name: "no tenant", + ctx: context.TODO(), + mustFail: true, + tenant: "", + }, + { + name: "two tenants", + ctx: ctxTwoTenants, + mustFail: true, + tenant: "", + }, + { + name: "invalid tenant", + ctx: withIncomingMetadata(context.TODO(), tenantHeader, "an-invalid-tenant", t), + mustFail: true, + tenant: "", + }, + } + + processor := &mockSpanProcessor{} + handler := NewGRPCHandler(zap.NewNop(), processor, + tenancy.NewTenancyConfig(&tenancy.Options{ + Enabled: true, + Header: tenantHeader, + Tenants: validTenants, + })) + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + tenant, err := handler.validateTenant(test.ctx) + if test.mustFail { + require.Error(t, err) + } else { + require.NoError(t, err) + } + assert.Equal(t, test.tenant, tenant) }) } } diff --git a/cmd/collector/app/handler/otlp_receiver.go b/cmd/collector/app/handler/otlp_receiver.go index 41da6163bf4..69854cd32d4 100644 --- a/cmd/collector/app/handler/otlp_receiver.go +++ b/cmd/collector/app/handler/otlp_receiver.go @@ -162,7 +162,7 @@ func (c *consumerDelegate) consume(_ context.Context, td ptrace.Traces) error { return err } for _, batch := range batches { - err := c.batchConsumer.consume(batch) + err := c.batchConsumer.consume(batch, "") if err != nil { return err } diff --git a/cmd/collector/app/server/grpc_test.go b/cmd/collector/app/server/grpc_test.go index 256229a4dbf..34ab8d0e460 100644 --- a/cmd/collector/app/server/grpc_test.go +++ b/cmd/collector/app/server/grpc_test.go @@ -30,6 +30,7 @@ import ( "github.com/jaegertracing/jaeger/cmd/collector/app/handler" "github.com/jaegertracing/jaeger/internal/grpctest" + "github.com/jaegertracing/jaeger/pkg/config/tenancy" "github.com/jaegertracing/jaeger/pkg/config/tlscfg" "github.com/jaegertracing/jaeger/proto-gen/api_v2" ) @@ -39,7 +40,7 @@ func TestFailToListen(t *testing.T) { logger, _ := zap.NewDevelopment() server, err := StartGRPCServer(&GRPCServerParams{ HostPort: ":-1", - Handler: handler.NewGRPCHandler(logger, &mockSpanProcessor{}), + Handler: handler.NewGRPCHandler(logger, &mockSpanProcessor{}, &tenancy.TenancyConfig{}), SamplingStore: &mockSamplingStore{}, Logger: logger, }) @@ -56,7 +57,7 @@ func TestFailServe(t *testing.T) { logger := zap.New(core) serveGRPC(grpc.NewServer(), lis, &GRPCServerParams{ - Handler: handler.NewGRPCHandler(logger, &mockSpanProcessor{}), + Handler: handler.NewGRPCHandler(logger, &mockSpanProcessor{}, &tenancy.TenancyConfig{}), SamplingStore: &mockSamplingStore{}, Logger: logger, OnError: func(e error) { @@ -71,7 +72,7 @@ func TestFailServe(t *testing.T) { func TestSpanCollector(t *testing.T) { logger, _ := zap.NewDevelopment() params := &GRPCServerParams{ - Handler: handler.NewGRPCHandler(logger, &mockSpanProcessor{}), + Handler: handler.NewGRPCHandler(logger, &mockSpanProcessor{}, &tenancy.TenancyConfig{}), SamplingStore: &mockSamplingStore{}, Logger: logger, MaxReceiveMessageLength: 1024 * 1024, @@ -96,7 +97,7 @@ func TestSpanCollector(t *testing.T) { func TestCollectorStartWithTLS(t *testing.T) { logger, _ := zap.NewDevelopment() params := &GRPCServerParams{ - Handler: handler.NewGRPCHandler(logger, &mockSpanProcessor{}), + Handler: handler.NewGRPCHandler(logger, &mockSpanProcessor{}, &tenancy.TenancyConfig{}), SamplingStore: &mockSamplingStore{}, Logger: logger, TLSConfig: tlscfg.Options{ @@ -115,7 +116,7 @@ func TestCollectorStartWithTLS(t *testing.T) { func TestCollectorReflection(t *testing.T) { logger, _ := zap.NewDevelopment() params := &GRPCServerParams{ - Handler: handler.NewGRPCHandler(logger, &mockSpanProcessor{}), + Handler: handler.NewGRPCHandler(logger, &mockSpanProcessor{}, &tenancy.TenancyConfig{}), SamplingStore: &mockSamplingStore{}, Logger: logger, } diff --git a/cmd/collector/app/span_handler_builder.go b/cmd/collector/app/span_handler_builder.go index 362b6f1764c..6ba46da3856 100644 --- a/cmd/collector/app/span_handler_builder.go +++ b/cmd/collector/app/span_handler_builder.go @@ -26,6 +26,7 @@ import ( "github.com/jaegertracing/jaeger/cmd/collector/app/processor" zs "github.com/jaegertracing/jaeger/cmd/collector/app/sanitizer/zipkin" "github.com/jaegertracing/jaeger/model" + "github.com/jaegertracing/jaeger/pkg/config/tenancy" "github.com/jaegertracing/jaeger/storage/spanstore" ) @@ -74,7 +75,7 @@ func (b *SpanHandlerBuilder) BuildHandlers(spanProcessor processor.SpanProcessor zs.NewChainedSanitizer(zs.NewStandardSanitizers()...), ), handler.NewJaegerSpanHandler(b.Logger, spanProcessor), - handler.NewGRPCHandler(b.Logger, spanProcessor), + handler.NewGRPCHandler(b.Logger, spanProcessor, tenancy.NewTenancyConfig(&b.CollectorOpts.GRPC.Tenancy)), } } diff --git a/pkg/config/tenancy/flags.go b/pkg/config/tenancy/flags.go new file mode 100644 index 00000000000..0b757dd8980 --- /dev/null +++ b/pkg/config/tenancy/flags.go @@ -0,0 +1,53 @@ +// Copyright (c) 2022 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tenancy + +import ( + "flag" + "fmt" + "strings" + + "github.com/spf13/viper" +) + +const ( + tenancyEnabled = "multi_tenancy.enabled" + tenancyHeader = "multi_tenancy.header" + validTenants = "multi_tenancy.tenants" +) + +// AddFlags adds flags for tenancy to the FlagSet. +func AddFlags(flags *flag.FlagSet) { + flags.Bool(tenancyEnabled, false, "Enable tenancy header when receiving or querying") + flags.String(tenancyHeader, "x-tenant", "HTTP header carrying tenant") + flags.String(validTenants, "", + fmt.Sprintf("comma-separated list of allowed values for --%s header. (If not supplied, tenants are not restricted)", + tenancyHeader)) +} + +// InitFromViper creates tenancy.Options populated with values retrieved from Viper. +func InitFromViper(v *viper.Viper) (Options, error) { + var p Options + p.Enabled = v.GetBool(tenancyEnabled) + p.Header = v.GetString(tenancyHeader) + tenants := v.GetString(validTenants) + if len(tenants) != 0 { + p.Tenants = strings.Split(tenants, ",") + } else { + p.Tenants = []string{} + } + + return p, nil +} diff --git a/pkg/config/tenancy/flags_test.go b/pkg/config/tenancy/flags_test.go new file mode 100644 index 00000000000..425e91c6ff2 --- /dev/null +++ b/pkg/config/tenancy/flags_test.go @@ -0,0 +1,101 @@ +// Copyright (c) 2022 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tenancy + +import ( + "flag" + "testing" + + "github.com/spf13/cobra" + "github.com/spf13/viper" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestTenancyFlags(t *testing.T) { + tests := []struct { + name string + cmd []string + expected Options + }{ + { + name: "one tenant", + cmd: []string{ + "--multi_tenancy.enabled=true", + "--multi_tenancy.tenants=acme", + }, + expected: Options{ + Enabled: true, + Header: "x-tenant", + Tenants: []string{"acme"}, + }, + }, + { + name: "two tenants", + cmd: []string{ + "--multi_tenancy.enabled=true", + "--multi_tenancy.tenants=acme,country-store", + }, + expected: Options{ + Enabled: true, + Header: "x-tenant", + Tenants: []string{"acme", "country-store"}, + }, + }, + { + name: "custom header", + cmd: []string{ + "--multi_tenancy.enabled=true", + "--multi_tenancy.header=jaeger-tenant", + "--multi_tenancy.tenants=acme", + }, + expected: Options{ + Enabled: true, + Header: "jaeger-tenant", + Tenants: []string{"acme"}, + }, + }, + { + // Not supplying a list of tenants will mean + // "tenant header required, but any value will pass" + name: "no_tenants", + cmd: []string{ + "--multi_tenancy.enabled=true", + }, + expected: Options{ + Enabled: true, + Header: "x-tenant", + Tenants: []string{}, + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + v := viper.New() + command := cobra.Command{} + flagSet := &flag.FlagSet{} + AddFlags(flagSet) + command.PersistentFlags().AddGoFlagSet(flagSet) + v.BindPFlags(command.PersistentFlags()) + + err := command.ParseFlags(test.cmd) + require.NoError(t, err) + tenancyCfg, err := InitFromViper(v) + require.NoError(t, err) + assert.Equal(t, test.expected, tenancyCfg) + }) + } +} diff --git a/pkg/config/tenancy/tenancy.go b/pkg/config/tenancy/tenancy.go new file mode 100644 index 00000000000..58909b7af16 --- /dev/null +++ b/pkg/config/tenancy/tenancy.go @@ -0,0 +1,86 @@ +// Copyright (c) 2022 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tenancy + +// TenancyConfig holds the settings for multi-tenant Jaeger +type TenancyConfig struct { + Enabled bool + Header string + guard guard +} + +// Guard verifies a valid tenant when tenancy is enabled +type guard interface { + Valid(candidate string) bool +} + +// Options describes the configuration properties for multitenancy +type Options struct { + Enabled bool + Header string + Tenants []string +} + +// NewTenancyConfig creates a tenancy configuration for tenancy Options +func NewTenancyConfig(options *Options) *TenancyConfig { + return &TenancyConfig{ + Enabled: options.Enabled, + Header: options.Header, + guard: tenancyGuardFactory(options), + } +} + +func (tc *TenancyConfig) Valid(tenant string) bool { + return tc.guard.Valid(tenant) +} + +type tenantDontCare bool + +func (tenantDontCare) Valid(candidate string) bool { + return true +} + +type tenantList struct { + tenants map[string]bool +} + +func (tl *tenantList) Valid(candidate string) bool { + _, ok := tl.tenants[candidate] + return ok +} + +func newTenantList(tenants []string) *tenantList { + tenantMap := make(map[string]bool) + for _, tenant := range tenants { + tenantMap[tenant] = true + } + + return &tenantList{ + tenants: tenantMap, + } +} + +func tenancyGuardFactory(options *Options) guard { + // Three cases + // - no tenancy + // - tenancy, but no guarding by tenant + // - tenancy, with guarding by a list + + if !options.Enabled || len(options.Tenants) == 0 { + return tenantDontCare(true) + } + + return newTenantList(options.Tenants) +} diff --git a/pkg/config/tenancy/tenancy_test.go b/pkg/config/tenancy/tenancy_test.go new file mode 100644 index 00000000000..b84bd25d405 --- /dev/null +++ b/pkg/config/tenancy/tenancy_test.go @@ -0,0 +1,91 @@ +// Copyright (c) 2022 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tenancy + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestTenancyValidity(t *testing.T) { + tests := []struct { + name string + options Options + tenant string + valid bool + }{ + { + name: "valid single tenant", + options: Options{ + Enabled: true, + Header: "x-tenant", + Tenants: []string{"acme"}, + }, + tenant: "acme", + valid: true, + }, + { + name: "valid tenant in multi-tenant setup", + options: Options{ + Enabled: true, + Header: "x-tenant", + Tenants: []string{"acme", "country-store"}, + }, + tenant: "acme", + valid: true, + }, + { + name: "invalid tenant", + options: Options{ + Enabled: true, + Header: "x-tenant", + Tenants: []string{"acme", "country-store"}, + }, + tenant: "auto-repair", + valid: false, + }, + { + // Not supplying a list of tenants will mean + // "tenant header required, but any value will pass" + name: "any tenant", + options: Options{ + Enabled: true, + Header: "x-tenant", + Tenants: []string{}, + }, + tenant: "convenience-store", + valid: true, + }, + { + name: "ignore tenant", + options: Options{ + Enabled: false, + Header: "", + Tenants: []string{"acme"}, + }, + tenant: "country-store", + // If tenancy not enabled, any tenant is valid + valid: true, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + tc := NewTenancyConfig(&test.options) + assert.Equal(t, test.valid, tc.Valid(test.tenant)) + }) + } +}