Skip to content

Commit

Permalink
Flow tenant from GRPC PostSpans header through processors
Browse files Browse the repository at this point in the history
Signed-off-by: Ed Snible <[email protected]>
  • Loading branch information
esnible committed May 31, 2022
1 parent 557ff71 commit e982edf
Show file tree
Hide file tree
Showing 11 changed files with 627 additions and 51 deletions.
10 changes: 10 additions & 0 deletions cmd/collector/app/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/flags"
"github.com/jaegertracing/jaeger/pkg/config/tenancy"
"github.com/jaegertracing/jaeger/pkg/config/tlscfg"
"github.com/jaegertracing/jaeger/ports"
)
Expand Down Expand Up @@ -152,6 +153,8 @@ type GRPCOptions struct {
// MaxConnectionAgeGrace is an additive period after MaxConnectionAge after which the connection will be forcibly closed.
// See gRPC's keepalive.ServerParameters#MaxConnectionAgeGrace.
MaxConnectionAgeGrace time.Duration
// Tenancy configures tenancy for endpoints that collect spans
Tenancy tenancy.Options
}

// AddFlags adds flags for CollectorOptions
Expand All @@ -172,6 +175,8 @@ func AddFlags(flags *flag.FlagSet) {
flags.String(flagZipkinAllowedOrigins, "*", "Comma separated list of allowed origins for the Zipkin collector service, default accepts all")
flags.String(flagZipkinHTTPHostPort, "", "The host:port (e.g. 127.0.0.1:9411 or :9411) of the collector's Zipkin server (disabled by default)")
tlsZipkinFlagsConfig.AddFlags(flags)

tenancy.AddFlags(flags)
}

func addHTTPFlags(flags *flag.FlagSet, cfg serverFlagsConfig, defaultHostPort string) {
Expand Down Expand Up @@ -219,6 +224,11 @@ func (opts *GRPCOptions) initFromViper(v *viper.Viper, logger *zap.Logger, cfg s
} else {
return fmt.Errorf("failed to parse gRPC TLS options: %w", err)
}
if tenancy, err := tenancy.InitFromViper(v); err == nil {
opts.Tenancy = tenancy
} else {
return fmt.Errorf("failed to parse Tenancy options: %w", err)
}

return nil
}
Expand Down
36 changes: 36 additions & 0 deletions cmd/collector/app/flags/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,39 @@ func TestCollectorOptionsWithFlags_CheckMaxConnectionAge(t *testing.T) {
assert.Equal(t, 5*time.Minute, c.GRPC.MaxConnectionAge)
assert.Equal(t, time.Minute, c.GRPC.MaxConnectionAgeGrace)
}

func TestCollectorOptionsWithFlags_CheckNoTenancy(t *testing.T) {
c := &CollectorOptions{}
v, command := config.Viperize(AddFlags)
command.ParseFlags([]string{})
c.InitFromViper(v, zap.NewNop())

assert.Equal(t, false, c.GRPC.Tenancy.Enabled)
}

func TestCollectorOptionsWithFlags_CheckSimpleTenancy(t *testing.T) {
c := &CollectorOptions{}
v, command := config.Viperize(AddFlags)
command.ParseFlags([]string{
"--multi_tenancy.enabled=true",
})
c.InitFromViper(v, zap.NewNop())

assert.Equal(t, true, c.GRPC.Tenancy.Enabled)
assert.Equal(t, "x-tenant", c.GRPC.Tenancy.Header)
}

func TestCollectorOptionsWithFlags_CheckFullTenancy(t *testing.T) {
c := &CollectorOptions{}
v, command := config.Viperize(AddFlags)
command.ParseFlags([]string{
"--multi_tenancy.enabled=true",
"--multi_tenancy.header=custom-tenant-header",
"--multi_tenancy.tenants=acme,hardware-store",
})
c.InitFromViper(v, zap.NewNop())

assert.Equal(t, true, c.GRPC.Tenancy.Enabled)
assert.Equal(t, "custom-tenant-header", c.GRPC.Tenancy.Header)
assert.Equal(t, []string{"acme", "hardware-store"}, c.GRPC.Tenancy.Tenants)
}
41 changes: 38 additions & 3 deletions cmd/collector/app/handler/grpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,24 @@ import (
"go.uber.org/zap"
"google.golang.org/grpc/codes"
_ "google.golang.org/grpc/encoding/gzip" // register zip encoding
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"

"github.com/jaegertracing/jaeger/cmd/collector/app/processor"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/config/tenancy"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
)

// GRPCHandler implements gRPC CollectorService.
type GRPCHandler struct {
logger *zap.Logger
batchConsumer batchConsumer
tenancyConfig *tenancy.TenancyConfig
}

// NewGRPCHandler registers routes for this handler on the given router.
func NewGRPCHandler(logger *zap.Logger, spanProcessor processor.SpanProcessor) *GRPCHandler {
func NewGRPCHandler(logger *zap.Logger, spanProcessor processor.SpanProcessor, tenancyConfig *tenancy.TenancyConfig) *GRPCHandler {
return &GRPCHandler{
logger: logger,
batchConsumer: batchConsumer{
Expand All @@ -45,13 +48,20 @@ func NewGRPCHandler(logger *zap.Logger, spanProcessor processor.SpanProcessor) *
SpanFormat: processor.ProtoSpanFormat,
},
},
tenancyConfig: tenancyConfig,
}
}

// PostSpans implements gRPC CollectorService.
func (g *GRPCHandler) PostSpans(ctx context.Context, r *api_v2.PostSpansRequest) (*api_v2.PostSpansResponse, error) {
tenant, err := g.validateTenant(ctx)
if err != nil {
g.logger.Error("rejecting spans (tenancy)", zap.Error(err))
return nil, err
}

batch := &r.Batch
err := g.batchConsumer.consume(batch)
err = g.batchConsumer.consume(batch, tenant)
return &api_v2.PostSpansResponse{}, err
}

Expand All @@ -61,7 +71,7 @@ type batchConsumer struct {
spanOptions processor.SpansOptions
}

func (c *batchConsumer) consume(batch *model.Batch) error {
func (c *batchConsumer) consume(batch *model.Batch, tenant string) error {
for _, span := range batch.Spans {
if span.GetProcess() == nil {
span.Process = batch.Process
Expand All @@ -70,6 +80,7 @@ func (c *batchConsumer) consume(batch *model.Batch) error {
_, err := c.spanProcessor.ProcessSpans(batch.Spans, processor.SpansOptions{
InboundTransport: processor.GRPCTransport,
SpanFormat: processor.ProtoSpanFormat,
Tenant: tenant,
})
if err != nil {
if err == processor.ErrBusy {
Expand All @@ -80,3 +91,27 @@ func (c *batchConsumer) consume(batch *model.Batch) error {
}
return nil
}

func (g *GRPCHandler) validateTenant(ctx context.Context) (string, error) {
if !g.tenancyConfig.Enabled {
return "", nil
}

md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return "", status.Errorf(codes.PermissionDenied, "missing tenant header")
}

tenants := md[g.tenancyConfig.Header]
if len(tenants) < 1 {
return "", status.Errorf(codes.PermissionDenied, "missing tenant header")
} else if len(tenants) > 1 {
return "", status.Errorf(codes.PermissionDenied, "extra tenant header")
}

if !g.tenancyConfig.Valid(tenants[0]) {
return "", status.Errorf(codes.PermissionDenied, "unknown tenant")
}

return tenants[0], nil
}
Loading

0 comments on commit e982edf

Please sign in to comment.