Skip to content

Commit

Permalink
Grpc plugin archive storage support (#2317)
Browse files Browse the repository at this point in the history
* Grpc plugin archive storage support

Signed-off-by: Andrew Putilov <[email protected]>

* Introduce service PluginCapabilities

Migrate from ArchiveSupportedRequest to CapabilitiesRequest

Signed-off-by: Andrew Putilov <[email protected]>

* Add comments

Signed-off-by: Andrew Putilov <[email protected]>

* Introduce PluginServices

Signed-off-by: Andrew Putilov <[email protected]>

* Format imports, insert copyright

Signed-off-by: Andrew Putilov <[email protected]>

* Bubble up error on Capabilities() call

Signed-off-by: Andrew Putilov <[email protected]>

* Add empty_test.go

Signed-off-by: Andrew Putilov <[email protected]>

* Remove ArchiveReader, ArchiveWriter

Signed-off-by: Andrew Putilov <[email protected]>

* Pass config.PluginServices to grpc.Serve

Signed-off-by: Andrew Putilov <[email protected]>

* Introduce shared.ArchiveReader/ArchiveWriter

Signed-off-by: Andrew Putilov <[email protected]>

* Improve ArchiveReader/ArchiveWriter according PR comments

Signed-off-by: Andrew Putilov <[email protected]>

* Test fixes

Signed-off-by: Andrew Putilov <[email protected]>

* Validate plugin type

Signed-off-by: Andrew Putilov <[email protected]>

* Add context to WriteSpan method

Signed-off-by: Andrew Putilov <[email protected]>

* Return plugin capabilities according ArchiveImpl property

Signed-off-by: Andrew Putilov <[email protected]>

* Apply changes from master

Signed-off-by: Andrew Putilov <[email protected]>

* Extract PluginServices to shared package, introduce ClientPluginServices on plugin client side

Signed-off-by: Andrew Putilov <[email protected]>

* Improve error text

Signed-off-by: Andrew Putilov <[email protected]>

* Rebase-related updates

Signed-off-by: Andrew Putilov <[email protected]>

* Rename memoryStore to memoryStorePlugin

Signed-off-by: Andrew Putilov <[email protected]>

* minor clean-up

- make internal types private
- add interface validations
- wrap errors with %w

Signed-off-by: Yuri Shkuro <[email protected]>

* Return codes.NotFound in grpc plugin on missing trace

Signed-off-by: Andrew Putilov <[email protected]>

* make fmt

Signed-off-by: Yuri Shkuro <[email protected]>

* Handle spanstore.ErrTraceNotFound in grpcServer.GetArchiveTrace

Signed-off-by: Andrew Putilov <[email protected]>

* Undo unnecessary change

Signed-off-by: Yuri Shkuro <[email protected]>

* Increase factory code coverage

Signed-off-by: Andrew Putilov <[email protected]>

* Trigger codecov again

Signed-off-by: Andrew Putilov <[email protected]>

* Disable coverage in plugin/storage/grpc/config/

Signed-off-by: Yuri Shkuro <[email protected]>

Co-authored-by: Yuri Shkuro <[email protected]>
Co-authored-by: Yuri Shkuro <[email protected]>
  • Loading branch information
3 people authored Sep 26, 2020
1 parent 9ea89d9 commit ecc5091
Show file tree
Hide file tree
Showing 28 changed files with 2,129 additions and 147 deletions.
29 changes: 23 additions & 6 deletions examples/memstore-plugin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/spf13/viper"

"github.com/jaegertracing/jaeger/plugin/storage/grpc"
"github.com/jaegertracing/jaeger/plugin/storage/grpc/shared"
"github.com/jaegertracing/jaeger/plugin/storage/memory"
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/spanstore"
Expand All @@ -45,21 +46,37 @@ func main() {
opts := memory.Options{}
opts.InitFromViper(v)

grpc.Serve(&memoryStore{store: memory.NewStore()})
plugin := &memoryStorePlugin{
store: memory.NewStore(),
archiveStore: memory.NewStore(),
}
grpc.Serve(&shared.PluginServices{
Store: plugin,
ArchiveStore: plugin,
})
}

type memoryStore struct {
store *memory.Store
type memoryStorePlugin struct {
store *memory.Store
archiveStore *memory.Store
}

func (ns *memoryStore) DependencyReader() dependencystore.Reader {
func (ns *memoryStorePlugin) DependencyReader() dependencystore.Reader {
return ns.store
}

func (ns *memoryStore) SpanReader() spanstore.Reader {
func (ns *memoryStorePlugin) SpanReader() spanstore.Reader {
return ns.store
}

func (ns *memoryStore) SpanWriter() spanstore.Writer {
func (ns *memoryStorePlugin) SpanWriter() spanstore.Writer {
return ns.store
}

func (ns *memoryStorePlugin) ArchiveSpanReader() spanstore.Reader {
return ns.archiveStore
}

func (ns *memoryStorePlugin) ArchiveSpanWriter() spanstore.Writer {
return ns.archiveStore
}
1 change: 1 addition & 0 deletions plugin/storage/grpc/config/.nocover
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
requires gRPC plugin binary
38 changes: 31 additions & 7 deletions plugin/storage/grpc/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,19 @@ type Configuration struct {
PluginLogLevel string `yaml:"log-level" mapstructure:"log_level"`
}

// ClientPluginServices defines services plugin can expose and its capabilities
type ClientPluginServices struct {
shared.PluginServices
Capabilities shared.PluginCapabilities
}

// PluginBuilder is used to create storage plugins. Implemented by Configuration.
type PluginBuilder interface {
Build() (shared.StoragePlugin, error)
Build() (*ClientPluginServices, error)
}

// Build instantiates a StoragePlugin
func (c *Configuration) Build() (shared.StoragePlugin, error) {
// Build instantiates a PluginServices
func (c *Configuration) Build() (*ClientPluginServices, error) {
// #nosec G204
cmd := exec.Command(c.PluginBinary, "--config", c.PluginConfigurationFile)

Expand All @@ -60,18 +66,36 @@ func (c *Configuration) Build() (shared.StoragePlugin, error) {

rpcClient, err := client.Client()
if err != nil {
return nil, fmt.Errorf("error attempting to connect to plugin rpc client: %s", err)
return nil, fmt.Errorf("error attempting to connect to plugin rpc client: %w", err)
}

raw, err := rpcClient.Dispense(shared.StoragePluginIdentifier)
if err != nil {
return nil, fmt.Errorf("unable to retrieve storage plugin instance: %s", err)
return nil, fmt.Errorf("unable to retrieve storage plugin instance: %w", err)
}

// in practice, the type of `raw` is *shared.grpcClient, and type casts below cannot fail
storagePlugin, ok := raw.(shared.StoragePlugin)
if !ok {
return nil, fmt.Errorf("unexpected type for plugin \"%s\"", shared.StoragePluginIdentifier)
return nil, fmt.Errorf("unable to cast %T to shared.StoragePlugin for plugin \"%s\"",
raw, shared.StoragePluginIdentifier)
}
archiveStoragePlugin, ok := raw.(shared.ArchiveStoragePlugin)
if !ok {
return nil, fmt.Errorf("unable to cast %T to shared.ArchiveStoragePlugin for plugin \"%s\"",
raw, shared.StoragePluginIdentifier)
}
capabilities, ok := raw.(shared.PluginCapabilities)
if !ok {
return nil, fmt.Errorf("unable to cast %T to shared.PluginCapabilities for plugin \"%s\"",
raw, shared.StoragePluginIdentifier)
}

return storagePlugin, nil
return &ClientPluginServices{
PluginServices: shared.PluginServices{
Store: storagePlugin,
ArchiveStore: archiveStoragePlugin,
},
Capabilities: capabilities,
}, nil
}
15 changes: 0 additions & 15 deletions plugin/storage/grpc/config/empty_test.go

This file was deleted.

41 changes: 38 additions & 3 deletions plugin/storage/grpc/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/jaegertracing/jaeger/plugin/storage/grpc/config"
"github.com/jaegertracing/jaeger/plugin/storage/grpc/shared"
"github.com/jaegertracing/jaeger/storage"
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/spanstore"
)
Expand All @@ -36,7 +37,9 @@ type Factory struct {

builder config.PluginBuilder

store shared.StoragePlugin
store shared.StoragePlugin
archiveStore shared.ArchiveStoragePlugin
capabilities shared.PluginCapabilities
}

// NewFactory creates a new Factory.
Expand Down Expand Up @@ -65,12 +68,14 @@ func (f *Factory) InitFromOptions(opts Options) {
func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) error {
f.metricsFactory, f.logger = metricsFactory, logger

store, err := f.builder.Build()
services, err := f.builder.Build()
if err != nil {
return fmt.Errorf("grpc-plugin builder failed to create a store: %w", err)
}

f.store = store
f.store = services.Store
f.archiveStore = services.ArchiveStore
f.capabilities = services.Capabilities
logger.Info("External plugin storage configuration", zap.Any("configuration", f.options.Configuration))
return nil
}
Expand All @@ -89,3 +94,33 @@ func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) {
func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) {
return f.store.DependencyReader(), nil
}

// CreateArchiveSpanReader implements storage.ArchiveFactory
func (f *Factory) CreateArchiveSpanReader() (spanstore.Reader, error) {
if f.capabilities == nil {
return nil, storage.ErrArchiveStorageNotSupported
}
capabilities, err := f.capabilities.Capabilities()
if err != nil {
return nil, err
}
if capabilities == nil || !capabilities.ArchiveSpanReader {
return nil, storage.ErrArchiveStorageNotSupported
}
return f.archiveStore.ArchiveSpanReader(), nil
}

// CreateArchiveSpanWriter implements storage.ArchiveFactory
func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) {
if f.capabilities == nil {
return nil, storage.ErrArchiveStorageNotSupported
}
capabilities, err := f.capabilities.Capabilities()
if err != nil {
return nil, err
}
if capabilities == nil || !capabilities.ArchiveSpanWriter {
return nil, storage.ErrArchiveStorageNotSupported
}
return f.archiveStore.ArchiveSpanWriter(), nil
}
149 changes: 145 additions & 4 deletions plugin/storage/grpc/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/jaegertracing/jaeger/pkg/config"
grpcConfig "github.com/jaegertracing/jaeger/plugin/storage/grpc/config"
"github.com/jaegertracing/jaeger/plugin/storage/grpc/mocks"
"github.com/jaegertracing/jaeger/plugin/storage/grpc/shared"
"github.com/jaegertracing/jaeger/storage"
"github.com/jaegertracing/jaeger/storage/dependencystore"
Expand All @@ -41,19 +42,45 @@ type mockPluginBuilder struct {
err error
}

func (b *mockPluginBuilder) Build() (shared.StoragePlugin, error) {
func (b *mockPluginBuilder) Build() (*grpcConfig.ClientPluginServices, error) {
if b.err != nil {
return nil, b.err
}
return b.plugin, nil

services := &grpcConfig.ClientPluginServices{
PluginServices: shared.PluginServices{
Store: b.plugin,
ArchiveStore: b.plugin,
},
}
if b.plugin.capabilities != nil {
services.Capabilities = b.plugin
}

return services, nil
}

type mockPlugin struct {
spanReader spanstore.Reader
spanWriter spanstore.Writer
archiveReader spanstore.Reader
archiveWriter spanstore.Writer
capabilities shared.PluginCapabilities
dependencyReader dependencystore.Reader
}

func (mp *mockPlugin) Capabilities() (*shared.Capabilities, error) {
return mp.capabilities.Capabilities()
}

func (mp *mockPlugin) ArchiveSpanReader() spanstore.Reader {
return mp.archiveReader
}

func (mp *mockPlugin) ArchiveSpanWriter() spanstore.Writer {
return mp.archiveWriter
}

func (mp *mockPlugin) SpanReader() spanstore.Reader {
return mp.spanReader
}
Expand Down Expand Up @@ -84,6 +111,9 @@ func TestGRPCStorageFactory(t *testing.T) {
plugin: &mockPlugin{
spanWriter: new(spanStoreMocks.Writer),
spanReader: new(spanStoreMocks.Reader),
archiveWriter: new(spanStoreMocks.Writer),
archiveReader: new(spanStoreMocks.Reader),
capabilities: new(mocks.PluginCapabilities),
dependencyReader: new(dependencyStoreMocks.Reader),
},
}
Expand All @@ -101,14 +131,125 @@ func TestGRPCStorageFactory(t *testing.T) {
assert.Equal(t, f.store.DependencyReader(), depReader)
}

func TestGRPCStorageFactory_Capabilities(t *testing.T) {
f := NewFactory()
v := viper.New()
f.InitFromViper(v)

capabilities := new(mocks.PluginCapabilities)
capabilities.On("Capabilities").
Return(&shared.Capabilities{
ArchiveSpanReader: true,
ArchiveSpanWriter: true,
}, nil)

f.builder = &mockPluginBuilder{
plugin: &mockPlugin{
capabilities: capabilities,
archiveWriter: new(spanStoreMocks.Writer),
archiveReader: new(spanStoreMocks.Reader),
},
}
assert.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop()))

assert.NotNil(t, f.store)
reader, err := f.CreateArchiveSpanReader()
assert.NoError(t, err)
assert.NotNil(t, reader)
writer, err := f.CreateArchiveSpanWriter()
assert.NoError(t, err)
assert.NotNil(t, writer)
}

func TestGRPCStorageFactory_CapabilitiesDisabled(t *testing.T) {
f := NewFactory()
v := viper.New()
f.InitFromViper(v)

capabilities := new(mocks.PluginCapabilities)
capabilities.On("Capabilities").
Return(&shared.Capabilities{
ArchiveSpanReader: false,
ArchiveSpanWriter: false,
}, nil)

f.builder = &mockPluginBuilder{
plugin: &mockPlugin{
capabilities: capabilities,
archiveWriter: new(spanStoreMocks.Writer),
archiveReader: new(spanStoreMocks.Reader),
},
}
assert.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop()))

assert.NotNil(t, f.store)
reader, err := f.CreateArchiveSpanReader()
assert.EqualError(t, err, storage.ErrArchiveStorageNotSupported.Error())
assert.Nil(t, reader)
writer, err := f.CreateArchiveSpanWriter()
assert.EqualError(t, err, storage.ErrArchiveStorageNotSupported.Error())
assert.Nil(t, writer)
}

func TestGRPCStorageFactory_CapabilitiesError(t *testing.T) {
f := NewFactory()
v := viper.New()
f.InitFromViper(v)

capabilities := new(mocks.PluginCapabilities)
customError := errors.New("made-up error")
capabilities.On("Capabilities").
Return(nil, customError)

f.builder = &mockPluginBuilder{
plugin: &mockPlugin{
capabilities: capabilities,
archiveWriter: new(spanStoreMocks.Writer),
archiveReader: new(spanStoreMocks.Reader),
},
}
assert.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop()))

assert.NotNil(t, f.store)
reader, err := f.CreateArchiveSpanReader()
assert.EqualError(t, err, customError.Error())
assert.Nil(t, reader)
writer, err := f.CreateArchiveSpanWriter()
assert.EqualError(t, err, customError.Error())
assert.Nil(t, writer)
}

func TestGRPCStorageFactory_CapabilitiesNil(t *testing.T) {
f := NewFactory()
v := viper.New()
f.InitFromViper(v)

f.builder = &mockPluginBuilder{
plugin: &mockPlugin{
archiveWriter: new(spanStoreMocks.Writer),
archiveReader: new(spanStoreMocks.Reader),
},
}
assert.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop()))

assert.NotNil(t, f.store)
reader, err := f.CreateArchiveSpanReader()
assert.Equal(t, err, storage.ErrArchiveStorageNotSupported)
assert.Nil(t, reader)
writer, err := f.CreateArchiveSpanWriter()
assert.Equal(t, err, storage.ErrArchiveStorageNotSupported)
assert.Nil(t, writer)
}

func TestWithConfiguration(t *testing.T) {
f := NewFactory()
v, command := config.Viperize(f.AddFlags)
command.ParseFlags([]string{
err := command.ParseFlags([]string{
"--grpc-storage-plugin.log-level=debug",
"--grpc-storage-plugin.binary=noop-grpc-plugin",
"--grpc-storage-plugin.configuration-file=config.json",
"--grpc-storage-plugin.log-level=debug",
})
assert.NoError(t, err)
f.InitFromViper(v)
assert.Equal(t, f.options.Configuration.PluginBinary, "noop-grpc-plugin")
assert.Equal(t, f.options.Configuration.PluginConfigurationFile, "config.json")
Expand Down
Loading

0 comments on commit ecc5091

Please sign in to comment.