Skip to content

Commit

Permalink
Introduce service PluginCapabilities
Browse files Browse the repository at this point in the history
Migrate from ArchiveSupportedRequest to CapabilitiesRequest

Signed-off-by: Andrew Putilov <[email protected]>
  • Loading branch information
m8rge committed Jul 20, 2020
1 parent 4d97981 commit a3549ae
Show file tree
Hide file tree
Showing 18 changed files with 537 additions and 311 deletions.
11 changes: 1 addition & 10 deletions examples/memstore-plugin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/spf13/viper"

"github.com/jaegertracing/jaeger/plugin/storage/grpc"
"github.com/jaegertracing/jaeger/plugin/storage/grpc/shared"
"github.com/jaegertracing/jaeger/plugin/storage/memory"
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/spanstore"
Expand All @@ -46,21 +45,13 @@ func main() {
opts := memory.Options{}
opts.InitFromViper(v)

grpc.Serve(&memoryStore{store: memory.NewStore()})
grpc.Serve(&memoryStore{store: memory.NewStore()}, nil, nil)
}

type memoryStore struct {
store *memory.Store
}

func (ns *memoryStore) ArchiveSpanReader() shared.ArchiveReader {
return nil
}

func (ns *memoryStore) ArchiveSpanWriter() shared.ArchiveWriter {
return nil
}

func (ns *memoryStore) DependencyReader() dependencystore.Reader {
return ns.store
}
Expand Down
15 changes: 9 additions & 6 deletions plugin/storage/grpc/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type Configuration struct {
}

// Build instantiates a StoragePlugin
func (c *Configuration) Build() (shared.StoragePlugin, error) {
func (c *Configuration) Build() (shared.StoragePlugin, shared.ArchiveStoragePlugin, shared.PluginCapabilities, error) {
// #nosec G204
cmd := exec.Command(c.PluginBinary, "--config", c.PluginConfigurationFile)

Expand All @@ -55,23 +55,26 @@ func (c *Configuration) Build() (shared.StoragePlugin, error) {

rpcClient, err := client.Client()
if err != nil {
return nil, fmt.Errorf("error attempting to connect to plugin rpc client: %s", err)
return nil, nil, nil, fmt.Errorf("error attempting to connect to plugin rpc client: %s", err)
}

raw, err := rpcClient.Dispense(shared.StoragePluginIdentifier)
if err != nil {
return nil, fmt.Errorf("unable to retrieve storage plugin instance: %s", err)
return nil, nil, nil, fmt.Errorf("unable to retrieve storage plugin instance: %s", err)
}

storagePlugin, ok := raw.(shared.StoragePlugin)
if !ok {
return nil, fmt.Errorf("unexpected type for plugin \"%s\"", shared.StoragePluginIdentifier)
return nil, nil, nil, fmt.Errorf("unexpected type for plugin \"%s\"", shared.StoragePluginIdentifier)
}

return storagePlugin, nil
archiveStoragePlugin := raw.(shared.ArchiveStoragePlugin)
capabilities := raw.(shared.PluginCapabilities)

return storagePlugin, archiveStoragePlugin, capabilities, nil
}

// PluginBuilder is used to create storage plugins
type PluginBuilder interface {
Build() (shared.StoragePlugin, error)
Build() (shared.StoragePlugin, shared.ArchiveStoragePlugin, shared.PluginCapabilities, error)
}
27 changes: 18 additions & 9 deletions plugin/storage/grpc/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package grpc

import (
"context"
"flag"

"github.com/spf13/viper"
Expand All @@ -37,7 +36,9 @@ type Factory struct {

builder config.PluginBuilder

store shared.StoragePlugin
store shared.StoragePlugin
archiveStore shared.ArchiveStoragePlugin
capabilities shared.PluginCapabilities
}

// NewFactory creates a new Factory.
Expand Down Expand Up @@ -66,12 +67,14 @@ func (f *Factory) InitFromOptions(opts Options) {
func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) error {
f.metricsFactory, f.logger = metricsFactory, logger

store, err := f.builder.Build()
store, archiveStore, capabilities, err := f.builder.Build()
if err != nil {
return err
}

f.store = store
f.archiveStore = archiveStore
f.capabilities = capabilities
logger.Info("External plugin storage configuration", zap.Any("configuration", f.options.Configuration))
return nil
}
Expand All @@ -93,18 +96,24 @@ func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) {

// CreateArchiveSpanReader implements storage.ArchiveFactory
func (f *Factory) CreateArchiveSpanReader() (spanstore.Reader, error) {
supported, _ := f.store.ArchiveSpanReader().ArchiveSupported(context.Background())
if !supported {
if f.capabilities == nil {
return nil, storage.ErrArchiveStorageNotSupported
}
return &ArchiveReader{impl: f.store.ArchiveSpanReader()}, nil
capabilities, _ := f.capabilities.Capabilities()
if capabilities == nil || !capabilities.ArchiveSpanReader {
return nil, storage.ErrArchiveStorageNotSupported
}
return &ArchiveReader{impl: f.archiveStore.ArchiveSpanReader()}, nil
}

// CreateArchiveSpanWriter implements storage.ArchiveFactory
func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) {
supported, _ := f.store.ArchiveSpanReader().ArchiveSupported(context.Background())
if !supported {
if f.capabilities == nil {
return nil, storage.ErrArchiveStorageNotSupported
}
capabilities, _ := f.capabilities.Capabilities()
if capabilities == nil || !capabilities.ArchiveSpanWriter {
return nil, storage.ErrArchiveStorageNotSupported
}
return &ArchiveWriter{impl: f.store.ArchiveSpanWriter()}, nil
return &ArchiveWriter{impl: f.archiveStore.ArchiveSpanWriter()}, nil
}
42 changes: 28 additions & 14 deletions plugin/storage/grpc/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ package grpc

import (
"errors"
"github.com/jaegertracing/jaeger/plugin/storage/grpc/shared/extra"
"testing"

"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/uber/jaeger-lib/metrics"
"go.uber.org/zap"

Expand All @@ -42,21 +42,26 @@ type mockPluginBuilder struct {
err error
}

func (b *mockPluginBuilder) Build() (shared.StoragePlugin, error) {
func (b *mockPluginBuilder) Build() (shared.StoragePlugin, shared.ArchiveStoragePlugin, shared.PluginCapabilities, error) {
if b.err != nil {
return nil, b.err
return nil, nil, nil, b.err
}
return b.plugin, nil
return b.plugin, b.plugin, b.plugin, nil
}

type mockPlugin struct {
spanReader spanstore.Reader
spanWriter spanstore.Writer
archiveReader shared.ArchiveReader
archiveWriter shared.ArchiveWriter
capabilities shared.PluginCapabilities
dependencyReader dependencystore.Reader
}

func (mp *mockPlugin) Capabilities() (*extra.Capabilities, error) {
return mp.capabilities.Capabilities()
}

func (mp *mockPlugin) ArchiveSpanReader() shared.ArchiveReader {
return mp.archiveReader
}
Expand Down Expand Up @@ -95,6 +100,7 @@ func TestGRPCStorageFactory(t *testing.T) {
spanReader: new(spanStoreMocks.Reader),
archiveReader: new(mocks.ArchiveReader),
archiveWriter: new(mocks.ArchiveWriter),
capabilities: new(mocks.PluginCapabilities),
dependencyReader: new(dependencyStoreMocks.Reader),
},
}
Expand All @@ -112,17 +118,21 @@ func TestGRPCStorageFactory(t *testing.T) {
assert.Equal(t, f.store.DependencyReader(), depReader)
}

func TestGRPCArchiveStorageFactory(t *testing.T) {
func TestGRPCStorageFactory_Capabilities(t *testing.T) {
f := NewFactory()
v := viper.New()
f.InitFromViper(v)

archiveReader := new(mocks.ArchiveReader)
archiveReader.On("ArchiveSupported", mock.Anything).
Return(true, nil)
capabilities := new(mocks.PluginCapabilities)
capabilities.On("Capabilities").
Return(&extra.Capabilities{
ArchiveSpanReader: true,
ArchiveSpanWriter: true,
}, nil)

f.builder = &mockPluginBuilder{
plugin: &mockPlugin{
archiveReader: archiveReader,
capabilities: capabilities,
},
}
assert.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop()))
Expand All @@ -136,17 +146,21 @@ func TestGRPCArchiveStorageFactory(t *testing.T) {
assert.IsType(t, &ArchiveWriter{}, writer)
}

func TestGRPCArchiveStorageDisabledFactory(t *testing.T) {
func TestGRPCStorageFactory_CapabilitiesDisabled(t *testing.T) {
f := NewFactory()
v := viper.New()
f.InitFromViper(v)

archiveReader := new(mocks.ArchiveReader)
archiveReader.On("ArchiveSupported", mock.Anything).
Return(false, nil)
capabilities := new(mocks.PluginCapabilities)
capabilities.On("Capabilities").
Return(&extra.Capabilities{
ArchiveSpanReader: false,
ArchiveSpanWriter: false,
}, nil)

f.builder = &mockPluginBuilder{
plugin: &mockPlugin{
archiveReader: archiveReader,
capabilities: capabilities,
},
}
assert.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop()))
Expand Down
12 changes: 8 additions & 4 deletions plugin/storage/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,23 @@ import (
)

// Serve creates a plugin configuration using the implementation of StoragePlugin and then serves it.
func Serve(implementation shared.StoragePlugin) {
ServeWithGRPCServer(implementation, plugin.DefaultGRPCServer)
func Serve(impl shared.StoragePlugin, archiveImpl shared.ArchiveStoragePlugin, capabilitiesImpl shared.PluginCapabilities) {
ServeWithGRPCServer(impl, archiveImpl, capabilitiesImpl, plugin.DefaultGRPCServer)
}

// ServeWithGRPCServer creates a plugin configuration using the implementation of StoragePlugin and
// function to create grpcServer, and then serves it.
func ServeWithGRPCServer(implementation shared.StoragePlugin, grpcServer func([]grpc.ServerOption) *grpc.Server) {
func ServeWithGRPCServer(impl shared.StoragePlugin, archiveImpl shared.ArchiveStoragePlugin,
capabilitiesImpl shared.PluginCapabilities, grpcServer func([]grpc.ServerOption) *grpc.Server,
) {
plugin.Serve(&plugin.ServeConfig{
HandshakeConfig: shared.Handshake,
VersionedPlugins: map[int]plugin.PluginSet{
1: map[string]plugin.Plugin{
shared.StoragePluginIdentifier: &shared.StorageGRPCPlugin{
Impl: implementation,
Impl: impl,
ArchiveImpl: archiveImpl,
CapabilitiesImpl: capabilitiesImpl,
},
},
},
Expand Down
24 changes: 14 additions & 10 deletions plugin/storage/grpc/proto/storage.proto
Original file line number Diff line number Diff line change
Expand Up @@ -134,15 +134,6 @@ message FindTraceIDsResponse {
];
}

// empty; extensible in the future
message ArchiveSupportedRequest {

}

message ArchiveSupportedResponse {
bool supported = 1;
}

service SpanWriterPlugin {
// spanstore/Writer
rpc WriteSpan(WriteSpanRequest) returns (WriteSpanResponse);
Expand All @@ -165,10 +156,23 @@ service ArchiveSpanWriterPlugin {
service ArchiveSpanReaderPlugin {
// spanstore/Reader
rpc GetArchiveTrace(GetTraceRequest) returns (stream SpansResponseChunk);
rpc ArchiveSupported(ArchiveSupportedRequest) returns (ArchiveSupportedResponse);
}

service DependenciesReaderPlugin {
// dependencystore/Reader
rpc GetDependencies(GetDependenciesRequest) returns (GetDependenciesResponse);
}

// empty; extensible in the future
message CapabilitiesRequest {

}

message CapabilitiesResponse {
bool archiveSpanReader = 1;
bool archiveSpanWriter = 2;
}

service PluginCapabilities {
rpc Capabilities(CapabilitiesRequest) returns (CapabilitiesResponse);
}
6 changes: 6 additions & 0 deletions plugin/storage/grpc/shared/extra/capabilities.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package extra

type Capabilities struct {
ArchiveSpanReader bool
ArchiveSpanWriter bool
}
31 changes: 18 additions & 13 deletions plugin/storage/grpc/shared/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@ package shared
import (
"context"
"fmt"
"github.com/jaegertracing/jaeger/plugin/storage/grpc/shared/extra"
"google.golang.org/grpc/codes"
"io"
"time"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"

Expand All @@ -36,6 +37,7 @@ type grpcClient struct {
writerClient storage_v1.SpanWriterPluginClient
archiveReaderClient storage_v1.ArchiveSpanReaderPluginClient
archiveWriterClient storage_v1.ArchiveSpanWriterPluginClient
capabilitiesClient storage_v1.PluginCapabilitiesClient
depsReaderClient storage_v1.DependenciesReaderPluginClient
}

Expand Down Expand Up @@ -237,6 +239,21 @@ func (c *grpcClient) GetArchiveTrace(ctx context.Context, traceID model.TraceID)
return readTrace(stream)
}

func (c *grpcClient) Capabilities() (*extra.Capabilities, error) {
capabilities, err := c.capabilitiesClient.Capabilities(context.Background(), &storage_v1.CapabilitiesRequest{})
if status.Code(err) == codes.Unimplemented {
return &extra.Capabilities{}, nil
}
if err != nil {
return nil, fmt.Errorf("plugin error: %w", err)
}

return &extra.Capabilities{
ArchiveSpanReader: capabilities.ArchiveSpanReader,
ArchiveSpanWriter: capabilities.ArchiveSpanWriter,
}, nil
}

func readTrace(stream storage_v1.SpanReaderPlugin_GetTraceClient) (*model.Trace, error) {
trace := model.Trace{}
for received, err := stream.Recv(); err != io.EOF; received, err = stream.Recv() {
Expand All @@ -256,15 +273,3 @@ func readTrace(stream storage_v1.SpanReaderPlugin_GetTraceClient) (*model.Trace,

return &trace, nil
}

func (c *grpcClient) ArchiveSupported(ctx context.Context) (bool, error) {
response, err := c.archiveReaderClient.ArchiveSupported(ctx, &storage_v1.ArchiveSupportedRequest{})
if status.Code(err) == codes.Unimplemented {
return false, nil
}
if err != nil {
return false, fmt.Errorf("plugin error: %w", err)
}

return response.Supported, nil
}
Loading

0 comments on commit a3549ae

Please sign in to comment.