Skip to content

Commit

Permalink
Support tenant header propagation in query service and grpc-plugin (j…
Browse files Browse the repository at this point in the history
…aegertracing#4151)

Signed-off-by: shubbham1215 <[email protected]>
  • Loading branch information
pavolloffay authored and shubbham1215 committed Mar 5, 2023
1 parent 9a6b416 commit fea96a6
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 6 deletions.
2 changes: 2 additions & 0 deletions cmd/query/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ func main() {
storageFactory.AddFlags,
app.AddFlags,
metricsReaderFactory.AddFlags,
// add tenancy flags here to avoid panic caused by double registration in all-in-one
tenancy.AddFlags,
)

if err := command.Execute(); err != nil {
Expand Down
17 changes: 17 additions & 0 deletions pkg/tenancy/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,3 +130,20 @@ func NewClientUnaryInterceptor(tc *Manager) grpc.UnaryClientInterceptor {
return invoker(ctx, method, req, reply, cc, opts...)
})
}

// NewClientStreamInterceptor injects tenant header into gRPC request metadata.
func NewClientStreamInterceptor(tc *Manager) grpc.StreamClientInterceptor {
return grpc.StreamClientInterceptor(func(
ctx context.Context,
desc *grpc.StreamDesc,
cc *grpc.ClientConn,
method string,
streamer grpc.Streamer,
opts ...grpc.CallOption,
) (grpc.ClientStream, error) {
if tenant := GetTenant(ctx); tenant != "" {
ctx = metadata.AppendToOutgoingContext(ctx, tc.Header, tenant)
}
return streamer(ctx, desc, cc, method, opts...)
})
}
20 changes: 20 additions & 0 deletions pkg/tenancy/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,3 +131,23 @@ func TestClientUnaryInterceptor(t *testing.T) {
assert.Equal(t, "acme", tenant)
assert.Same(t, fakeErr, err)
}

func TestClientStreamInterceptor(t *testing.T) {
tm := NewManager(&Options{Enabled: true, Tenants: []string{"acme"}})
interceptor := NewClientStreamInterceptor(tm)
var tenant string
fakeErr := errors.New("foo")
ctx := WithTenant(context.Background(), "acme")
streamer := func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, opts ...grpc.CallOption) (grpc.ClientStream, error) {
md, ok := metadata.FromOutgoingContext(ctx)
assert.True(t, ok)
ten, err := tenantFromMetadata(md, tm.Header)
require.NoError(t, err)
tenant = ten
return nil, fakeErr
}
stream, err := interceptor(ctx, &grpc.StreamDesc{}, nil, "", streamer)
assert.Same(t, fakeErr, err)
require.Nil(t, stream)
assert.Equal(t, "acme", tenant)
}
25 changes: 20 additions & 5 deletions plugin/storage/grpc/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"google.golang.org/grpc/credentials/insecure"

"github.com/jaegertracing/jaeger/pkg/config/tlscfg"
"github.com/jaegertracing/jaeger/pkg/tenancy"
"github.com/jaegertracing/jaeger/plugin/storage/grpc/shared"
)

Expand All @@ -44,6 +45,7 @@ type Configuration struct {
RemoteServerAddr string `yaml:"server" mapstructure:"server"`
RemoteTLS tlscfg.Options
RemoteConnectTimeout time.Duration `yaml:"connection-timeout" mapstructure:"connection-timeout"`
TenancyOpts tenancy.Options

pluginHealthCheck *time.Ticker
pluginHealthCheckDone chan bool
Expand Down Expand Up @@ -99,6 +101,12 @@ func (c *Configuration) buildRemote(logger *zap.Logger) (*ClientPluginServices,

ctx, cancel := context.WithTimeout(context.Background(), c.RemoteConnectTimeout)
defer cancel()

tenancyMgr := tenancy.NewManager(&c.TenancyOpts)
if tenancyMgr.Enabled {
opts = append(opts, grpc.WithUnaryInterceptor(tenancy.NewClientUnaryInterceptor(tenancyMgr)))
opts = append(opts, grpc.WithStreamInterceptor(tenancy.NewClientStreamInterceptor(tenancyMgr)))
}
conn, err := grpc.DialContext(ctx, c.RemoteServerAddr, opts...)
if err != nil {
return nil, fmt.Errorf("error connecting to remote storage: %w", err)
Expand All @@ -116,9 +124,19 @@ func (c *Configuration) buildRemote(logger *zap.Logger) (*ClientPluginServices,
}

func (c *Configuration) buildPlugin(logger *zap.Logger) (*ClientPluginServices, error) {
opts := []grpc.DialOption{
grpc.WithUnaryInterceptor(otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer())),
grpc.WithStreamInterceptor(otgrpc.OpenTracingStreamClientInterceptor(opentracing.GlobalTracer())),
}

tenancyMgr := tenancy.NewManager(&c.TenancyOpts)
if tenancyMgr.Enabled {
opts = append(opts, grpc.WithUnaryInterceptor(tenancy.NewClientUnaryInterceptor(tenancyMgr)))
opts = append(opts, grpc.WithStreamInterceptor(tenancy.NewClientStreamInterceptor(tenancyMgr)))
}

// #nosec G204
cmd := exec.Command(c.PluginBinary, "--config", c.PluginConfigurationFile)

client := plugin.NewClient(&plugin.ClientConfig{
HandshakeConfig: shared.Handshake,
VersionedPlugins: map[int]plugin.PluginSet{
Expand All @@ -129,10 +147,7 @@ func (c *Configuration) buildPlugin(logger *zap.Logger) (*ClientPluginServices,
Logger: hclog.New(&hclog.LoggerOptions{
Level: hclog.LevelFromString(c.PluginLogLevel),
}),
GRPCDialOptions: []grpc.DialOption{
grpc.WithUnaryInterceptor(otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer())),
grpc.WithStreamInterceptor(otgrpc.OpenTracingStreamClientInterceptor(opentracing.GlobalTracer())),
},
GRPCDialOptions: opts,
})

runtime.SetFinalizer(client, func(c *plugin.Client) {
Expand Down
2 changes: 2 additions & 0 deletions plugin/storage/grpc/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/spf13/viper"

"github.com/jaegertracing/jaeger/pkg/config/tlscfg"
"github.com/jaegertracing/jaeger/pkg/tenancy"
"github.com/jaegertracing/jaeger/plugin/storage/grpc/config"
)

Expand Down Expand Up @@ -71,5 +72,6 @@ func (opt *Options) InitFromViper(v *viper.Viper) error {
return fmt.Errorf("failed to parse gRPC storage TLS options: %w", err)
}
opt.Configuration.RemoteConnectTimeout = v.GetDuration(remoteConnectionTimeout)
opt.Configuration.TenancyOpts = tenancy.InitFromViper(v)
return nil
}
6 changes: 5 additions & 1 deletion plugin/storage/grpc/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,26 @@ import (
"github.com/stretchr/testify/require"

"github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/pkg/tenancy"
)

func TestOptionsWithFlags(t *testing.T) {
opts := &Options{}
v, command := config.Viperize(opts.AddFlags)
v, command := config.Viperize(opts.AddFlags, tenancy.AddFlags)
err := command.ParseFlags([]string{
"--grpc-storage-plugin.binary=noop-grpc-plugin",
"--grpc-storage-plugin.configuration-file=config.json",
"--grpc-storage-plugin.log-level=debug",
"--multi-tenancy.header=x-scope-orgid",
})
assert.NoError(t, err)
opts.InitFromViper(v)

assert.Equal(t, opts.Configuration.PluginBinary, "noop-grpc-plugin")
assert.Equal(t, opts.Configuration.PluginConfigurationFile, "config.json")
assert.Equal(t, opts.Configuration.PluginLogLevel, "debug")
assert.Equal(t, false, opts.Configuration.TenancyOpts.Enabled)
assert.Equal(t, "x-scope-orgid", opts.Configuration.TenancyOpts.Header)
}

func TestRemoteOptionsWithFlags(t *testing.T) {
Expand Down

0 comments on commit fea96a6

Please sign in to comment.