Skip to content

Commit

Permalink
(When configured for multitenancy) validate tenancy header and pass J…
Browse files Browse the repository at this point in the history
…aeger gRPC tenant header to span processors and storage

Signed-off-by: Ed Snible <[email protected]>
  • Loading branch information
esnible committed May 19, 2022
1 parent 4f9f7df commit c6db258
Show file tree
Hide file tree
Showing 11 changed files with 497 additions and 13 deletions.
7 changes: 7 additions & 0 deletions cmd/collector/app/builder_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/spf13/viper"

"github.com/jaegertracing/jaeger/cmd/flags"
"github.com/jaegertracing/jaeger/pkg/config/tenancy"
"github.com/jaegertracing/jaeger/pkg/config/tlscfg"
"github.com/jaegertracing/jaeger/ports"
)
Expand Down Expand Up @@ -54,6 +55,8 @@ var tlsZipkinFlagsConfig = tlscfg.ServerFlagsConfig{
Prefix: "collector.zipkin",
}

var tenancyFlagsConfig = tenancy.TenancyFlagsConfig{}

// CollectorOptions holds configuration for collector
type CollectorOptions struct {
// DynQueueSizeMemory determines how much memory to use for the queue
Expand Down Expand Up @@ -88,6 +91,8 @@ type CollectorOptions struct {
// CollectorGRPCMaxConnectionAgeGrace is an additive period after MaxConnectionAge after which the connection will be forcibly closed.
// See gRPC's keepalive.ServerParameters#MaxConnectionAgeGrace.
CollectorGRPCMaxConnectionAgeGrace time.Duration
// TenancyGRPC configures tenancy for gRPC endpoint to collect spans
TenancyGRPC tenancy.Options
}

// AddFlags adds flags for CollectorOptions
Expand All @@ -108,6 +113,8 @@ func AddFlags(flags *flag.FlagSet) {
tlsGRPCFlagsConfig.AddFlags(flags)
tlsHTTPFlagsConfig.AddFlags(flags)
tlsZipkinFlagsConfig.AddFlags(flags)

tenancyFlagsConfig.AddFlags(flags)
}

// InitFromViper initializes CollectorOptions with properties from viper
Expand Down
2 changes: 1 addition & 1 deletion cmd/collector/app/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (c *Collector) Start(builderOpts *CollectorOptions) error {
}

c.spanProcessor = handlerBuilder.BuildSpanProcessor(additionalProcessors...)
c.spanHandlers = handlerBuilder.BuildHandlers(c.spanProcessor)
c.spanHandlers = handlerBuilder.BuildHandlers(c.spanProcessor, builderOpts)

grpcServer, err := server.StartGRPCServer(&server.GRPCServerParams{
HostPort: builderOpts.CollectorGRPCHostPort,
Expand Down
37 changes: 36 additions & 1 deletion cmd/collector/app/handler/grpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,28 +20,42 @@ import (
"go.uber.org/zap"
"google.golang.org/grpc/codes"
_ "google.golang.org/grpc/encoding/gzip"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"

"github.com/jaegertracing/jaeger/cmd/collector/app/processor"
"github.com/jaegertracing/jaeger/pkg/config/tenancy"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
)

// GRPCHandler implements gRPC CollectorService.
type GRPCHandler struct {
logger *zap.Logger
spanProcessor processor.SpanProcessor
tenancyConfig *tenancy.TenancyConfig
}

// NewGRPCHandler registers routes for this handler on the given router.
func NewGRPCHandler(logger *zap.Logger, spanProcessor processor.SpanProcessor) *GRPCHandler {
func NewGRPCHandler(logger *zap.Logger, spanProcessor processor.SpanProcessor, tenancyConfig *tenancy.TenancyConfig) *GRPCHandler {
return &GRPCHandler{
logger: logger,
spanProcessor: spanProcessor,
tenancyConfig: tenancyConfig,
}
}

// PostSpans implements gRPC CollectorService.
func (g *GRPCHandler) PostSpans(ctx context.Context, r *api_v2.PostSpansRequest) (*api_v2.PostSpansResponse, error) {
var tenant string
if g.tenancyConfig.Enabled {
var err error
tenant, err = g.validateTenant(ctx)
if err != nil {
g.logger.Error("rejecting spans (tenancy)", zap.Error(err))
return nil, err
}
}

for _, span := range r.GetBatch().Spans {
if span.GetProcess() == nil {
span.Process = r.Batch.Process
Expand All @@ -50,6 +64,7 @@ func (g *GRPCHandler) PostSpans(ctx context.Context, r *api_v2.PostSpansRequest)
_, err := g.spanProcessor.ProcessSpans(r.GetBatch().Spans, processor.SpansOptions{
InboundTransport: processor.GRPCTransport,
SpanFormat: processor.ProtoSpanFormat,
Tenant: tenant,
})
if err != nil {
if err == processor.ErrBusy {
Expand All @@ -60,3 +75,23 @@ func (g *GRPCHandler) PostSpans(ctx context.Context, r *api_v2.PostSpansRequest)
}
return &api_v2.PostSpansResponse{}, nil
}

func (g *GRPCHandler) validateTenant(ctx context.Context) (string, error) {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return "", status.Errorf(codes.PermissionDenied, "missing tenant header")
}

tenants := md[g.tenancyConfig.Header]
if len(tenants) < 1 {
return "", status.Errorf(codes.PermissionDenied, "missing tenant header")
} else if len(tenants) > 1 {
return "", status.Errorf(codes.PermissionDenied, "extra tenant header")
}

if !g.tenancyConfig.Valid.Valid(tenants[0]) {
return "", status.Errorf(codes.PermissionDenied, "unknown tenant")
}

return tenants[0], nil
}
189 changes: 186 additions & 3 deletions cmd/collector/app/handler/grpc_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,23 +26,32 @@ import (
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/metadata"

"github.com/jaegertracing/jaeger/cmd/collector/app/processor"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/config/tenancy"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
)

type mockSpanProcessor struct {
expectedError error
mux sync.Mutex
spans []*model.Span
tenants map[string]bool
}

func (p *mockSpanProcessor) ProcessSpans(spans []*model.Span, opts processor.SpansOptions) ([]bool, error) {
p.mux.Lock()
defer p.mux.Unlock()
p.spans = append(p.spans, spans...)
oks := make([]bool, len(spans))

if p.tenants == nil {
p.tenants = make(map[string]bool)
}
p.tenants[opts.Tenant] = true

return oks, p.expectedError
}

Expand All @@ -52,10 +61,17 @@ func (p *mockSpanProcessor) getSpans() []*model.Span {
return p.spans
}

func (p *mockSpanProcessor) getTenants() map[string]bool {
p.mux.Lock()
defer p.mux.Unlock()
return p.tenants
}

func (p *mockSpanProcessor) reset() {
p.mux.Lock()
defer p.mux.Unlock()
p.spans = nil
p.tenants = nil
}

func (p *mockSpanProcessor) Close() error {
Expand Down Expand Up @@ -83,7 +99,7 @@ func newClient(t *testing.T, addr net.Addr) (api_v2.CollectorServiceClient, *grp
func TestPostSpans(t *testing.T) {
processor := &mockSpanProcessor{}
server, addr := initializeGRPCTestServer(t, func(s *grpc.Server) {
handler := NewGRPCHandler(zap.NewNop(), processor)
handler := NewGRPCHandler(zap.NewNop(), processor, &tenancy.TenancyConfig{})
api_v2.RegisterCollectorServiceServer(s, handler)
})
defer server.Stop()
Expand Down Expand Up @@ -114,7 +130,7 @@ func TestPostSpans(t *testing.T) {
func TestGRPCCompressionEnabled(t *testing.T) {
processor := &mockSpanProcessor{}
server, addr := initializeGRPCTestServer(t, func(s *grpc.Server) {
handler := NewGRPCHandler(zap.NewNop(), processor)
handler := NewGRPCHandler(zap.NewNop(), processor, &tenancy.TenancyConfig{})
api_v2.RegisterCollectorServiceServer(s, handler)
})
defer server.Stop()
Expand All @@ -132,7 +148,7 @@ func TestPostSpansWithError(t *testing.T) {
expectedError := errors.New("test-error")
processor := &mockSpanProcessor{expectedError: expectedError}
server, addr := initializeGRPCTestServer(t, func(s *grpc.Server) {
handler := NewGRPCHandler(zap.NewNop(), processor)
handler := NewGRPCHandler(zap.NewNop(), processor, &tenancy.TenancyConfig{})
api_v2.RegisterCollectorServiceServer(s, handler)
})
defer server.Stop()
Expand All @@ -152,3 +168,170 @@ func TestPostSpansWithError(t *testing.T) {
require.Contains(t, err.Error(), expectedError.Error())
require.Len(t, processor.getSpans(), 1)
}

// withMetadata returns a Context with metadata for outbound (client) calls
func withMetadata(ctx context.Context, headerName, headerValue string, t *testing.T) context.Context {
t.Helper()

md := metadata.New(map[string]string{headerName: headerValue})
return metadata.NewOutgoingContext(ctx, md)
}

// @@@ ecs TODO also add something similar to queries and v3 queries
func TestPostTenantedSpans(t *testing.T) {
tenantHeader := "x-tenant"
// @@@ ecs TODO test with bogus tenant
dummyTenant := "grpc-test-tenant"

processor := &mockSpanProcessor{}
server, addr := initializeGRPCTestServer(t, func(s *grpc.Server) {
handler := NewGRPCHandler(zap.NewNop(), processor,
tenancy.NewTenancyConfig(&tenancy.Options{
Enabled: true,
Header: tenantHeader,
Tenants: []string{dummyTenant},
}))
api_v2.RegisterCollectorServiceServer(s, handler)
})
defer server.Stop()
client, conn := newClient(t, addr)
defer conn.Close()

ctxWithTenant := withMetadata(context.Background(), tenantHeader, dummyTenant, t)
ctxNoTenant := context.Background()
mdTwoTenants := metadata.Pairs()
mdTwoTenants.Set(tenantHeader, "a", "b")
ctxTwoTenants := metadata.NewOutgoingContext(context.Background(), mdTwoTenants)
ctxBadTenant := withMetadata(context.Background(), tenantHeader, "invalid-tenant", t)

withMetadata(context.Background(),
tenantHeader, dummyTenant, t)

tests := []struct {
name string
ctx context.Context
batch model.Batch
mustFail bool
expected []*model.Span
expectedTenants map[string]bool
}{
{
name: "valid tenant",
ctx: ctxWithTenant,
batch: model.Batch{Process: &model.Process{ServiceName: "batch-process"}, Spans: []*model.Span{{OperationName: "test-op", Process: &model.Process{ServiceName: "bar"}}}},

mustFail: false,
expected: []*model.Span{{OperationName: "test-op", Process: &model.Process{ServiceName: "bar"}}},
expectedTenants: map[string]bool{dummyTenant: true},
},
{
name: "no tenant",
ctx: ctxNoTenant,
batch: model.Batch{Process: &model.Process{ServiceName: "batch-process"}, Spans: []*model.Span{{OperationName: "test-op", Process: &model.Process{ServiceName: "bar"}}}},

// Because NewGRPCHandler expects a tenant header, it will reject spans without one
mustFail: true,
expected: nil,
expectedTenants: nil,
},
{
name: "two tenants",
ctx: ctxTwoTenants,
batch: model.Batch{Process: &model.Process{ServiceName: "batch-process"}, Spans: []*model.Span{{OperationName: "test-op", Process: &model.Process{ServiceName: "bar"}}}},

// NewGRPCHandler rejects spans with multiple values for tenant header
mustFail: true,
expected: nil,
expectedTenants: nil,
},
{
name: "invalid tenant",
ctx: ctxBadTenant,
batch: model.Batch{Process: &model.Process{ServiceName: "batch-process"}, Spans: []*model.Span{{OperationName: "test-op", Process: &model.Process{ServiceName: "bar"}}}},

// NewGRPCHandler rejects spans with multiple values for tenant header
mustFail: true,
expected: nil,
expectedTenants: nil,
},
}
for _, test := range tests {
_, err := client.PostSpans(test.ctx, &api_v2.PostSpansRequest{
Batch: test.batch,
})
if test.mustFail {
require.Error(t, err, "test: %s", test.name)
} else {
require.NoError(t, err, "test: %s", test.name)
}
assert.Equal(t, test.expected, processor.getSpans(), "test: %s", test.name)
assert.Equal(t, test.expectedTenants, processor.getTenants(), "test: %s", test.name)
processor.reset()
}
}

// withIncomingMetadata returns a Context with metadata for a server to receive
func withIncomingMetadata(ctx context.Context, headerName, headerValue string, t *testing.T) context.Context {
t.Helper()

md := metadata.New(map[string]string{headerName: headerValue})
return metadata.NewIncomingContext(ctx, md)
}

// @@@ ecs TODO also add this to v3 version
func TestGetTenant(t *testing.T) {
tenantHeader := "some-tenant-header"
validTenants := []string{"acme", "another-example"}

mdTwoTenants := metadata.Pairs()
mdTwoTenants.Set(tenantHeader, "a", "b")
ctxTwoTenants := metadata.NewOutgoingContext(context.Background(), mdTwoTenants)

tests := []struct {
name string
ctx context.Context
tenant string
mustFail bool
}{
{
name: "valid tenant",
ctx: withIncomingMetadata(context.TODO(), tenantHeader, "acme", t),
mustFail: false,
tenant: "acme",
},
{
name: "no tenant",
ctx: context.TODO(),
mustFail: true,
tenant: "",
},
{
name: "two tenants",
ctx: ctxTwoTenants,
mustFail: true,
tenant: "",
},
{
name: "invalid tenant",
ctx: withIncomingMetadata(context.TODO(), tenantHeader, "an-invalid-tenant", t),
mustFail: true,
tenant: "",
},
}

processor := &mockSpanProcessor{}
handler := NewGRPCHandler(zap.NewNop(), processor, tenancy.NewTenancyConfig(&tenancy.Options{
Enabled: true,
Header: tenantHeader,
Tenants: validTenants,
}))
for _, test := range tests {
tenant, err := handler.validateTenant(test.ctx)
if test.mustFail {
require.Error(t, err, "test: %s", test.name)
} else {
require.NoError(t, err, "test: %s", test.name)
}
assert.Equal(t, test.tenant, tenant, "test: %s", test.name)
}
}
Loading

0 comments on commit c6db258

Please sign in to comment.