Skip to content

Commit

Permalink
feature: add streamingwriterspanplugin
Browse files Browse the repository at this point in the history
Signed-off-by: vuuihc <[email protected]>
  • Loading branch information
vuuihc committed Apr 27, 2022
1 parent b891773 commit 9e555df
Show file tree
Hide file tree
Showing 25 changed files with 1,063 additions and 108 deletions.
5 changes: 3 additions & 2 deletions examples/memstore-plugin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,9 @@ func main() {

memStorePlugin := grpcMemory.NewStoragePlugin(memory.NewStore(), memory.NewStore())
grpc.ServeWithGRPCServer(&shared.PluginServices{
Store: memStorePlugin,
ArchiveStore: memStorePlugin,
Store: memStorePlugin,
ArchiveStore: memStorePlugin,
StreamingSpanWriter: memStorePlugin,
}, func(options []googleGRPC.ServerOption) *googleGRPC.Server {
return plugin.DefaultGRPCServer([]googleGRPC.ServerOption{
googleGRPC.UnaryInterceptor(otgrpc.OpenTracingServerInterceptor(tracer)),
Expand Down
2 changes: 2 additions & 0 deletions plugin/storage/grpc/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ grpc.Serve(&shared.PluginServices{
})
```

To support writing span via client stream (which can enlarge the throughput), you must fill `StreamingSpanWriter` with the same plugin with `Store` field. Note that use streaming spanWriter may make the `save_by_svr` metric inaccurate, in which case you'll need to pay attention to the metrics provided by plugin. It is disabled default and can be enabled by providing the flag `--grpc-storage-plugin.writer-type=streaming`

Running with a plugin
---------------------
A plugin can be run using the `all-in-one` application within the top level `cmd` package of the Jaeger project. To do this
Expand Down
14 changes: 12 additions & 2 deletions plugin/storage/grpc/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type Configuration struct {
PluginBinary string `yaml:"binary" mapstructure:"binary"`
PluginConfigurationFile string `yaml:"configuration-file" mapstructure:"configuration_file"`
PluginLogLevel string `yaml:"log-level" mapstructure:"log_level"`
PluginWriterType string `yaml:"writer-type" mapstructure:"writer_type"`
RemoteServerAddr string `yaml:"server" mapstructure:"server"`
RemoteTLS tlscfg.Options
RemoteConnectTimeout time.Duration `yaml:"connection-timeout" mapstructure:"connection-timeout"`
Expand Down Expand Up @@ -159,6 +160,14 @@ func (c *Configuration) buildPlugin(logger *zap.Logger) (*ClientPluginServices,
return nil, fmt.Errorf("unable to cast %T to shared.ArchiveStoragePlugin for plugin \"%s\"",
raw, shared.StoragePluginIdentifier)
}
var streamingSpanWriterPlugin shared.StreamingSpanWriterPlugin
if c.PluginWriterType == "streaming" {
streamingSpanWriterPlugin, ok = raw.(shared.StreamingSpanWriterPlugin)
if !ok {
return nil, fmt.Errorf("unable to cast %T to shared.StreamingSpanWriterPlugin for plugin \"%s\"",
raw, shared.StoragePluginIdentifier)
}
}
capabilities, ok := raw.(shared.PluginCapabilities)
if !ok {
return nil, fmt.Errorf("unable to cast %T to shared.PluginCapabilities for plugin \"%s\"",
Expand All @@ -171,8 +180,9 @@ func (c *Configuration) buildPlugin(logger *zap.Logger) (*ClientPluginServices,

return &ClientPluginServices{
PluginServices: shared.PluginServices{
Store: storagePlugin,
ArchiveStore: archiveStoragePlugin,
Store: storagePlugin,
ArchiveStore: archiveStoragePlugin,
StreamingSpanWriter: streamingSpanWriterPlugin,
},
Capabilities: capabilities,
}, nil
Expand Down
20 changes: 16 additions & 4 deletions plugin/storage/grpc/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package grpc

import (
"errors"
"flag"
"fmt"
"io"
Expand All @@ -38,9 +39,10 @@ type Factory struct {

builder config.PluginBuilder

store shared.StoragePlugin
archiveStore shared.ArchiveStoragePlugin
capabilities shared.PluginCapabilities
store shared.StoragePlugin
archiveStore shared.ArchiveStoragePlugin
streamingSpanWriter shared.StreamingSpanWriterPlugin
capabilities shared.PluginCapabilities
}

var _ io.Closer = (*Factory)(nil)
Expand Down Expand Up @@ -81,6 +83,7 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger)
f.store = services.Store
f.archiveStore = services.ArchiveStore
f.capabilities = services.Capabilities
f.streamingSpanWriter = services.StreamingSpanWriter
logger.Info("External plugin storage configuration", zap.Any("configuration", f.options.Configuration))
return nil
}
Expand All @@ -92,7 +95,16 @@ func (f *Factory) CreateSpanReader() (spanstore.Reader, error) {

// CreateSpanWriter implements storage.Factory
func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) {
return f.store.SpanWriter(), nil
if f.streamingSpanWriter == nil {
return f.store.SpanWriter(), nil
}
if f.capabilities == nil {
return nil, errors.New("streaming writer not supported")
}
if capabilities, err := f.capabilities.Capabilities(); err != nil || !capabilities.StreamingSpanWriter {
return nil, fmt.Errorf("streaming writer not supported, capabilities %v, err %v", capabilities, err)
}
return f.streamingSpanWriter.StreamingSpanWriter(), nil
}

// CreateDependencyReader implements storage.Factory
Expand Down
107 changes: 91 additions & 16 deletions plugin/storage/grpc/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ import (
var _ storage.Factory = new(Factory)

type mockPluginBuilder struct {
plugin *mockPlugin
err error
plugin *mockPlugin
writerType string
err error
}

func (b *mockPluginBuilder) Build(logger *zap.Logger) (*grpcConfig.ClientPluginServices, error) {
Expand All @@ -53,6 +54,9 @@ func (b *mockPluginBuilder) Build(logger *zap.Logger) (*grpcConfig.ClientPluginS
ArchiveStore: b.plugin,
},
}
if b.writerType == "streaming" {
services.PluginServices.StreamingSpanWriter = b.plugin
}
if b.plugin.capabilities != nil {
services.Capabilities = b.plugin
}
Expand All @@ -65,12 +69,13 @@ func (b *mockPluginBuilder) Close() error {
}

type mockPlugin struct {
spanReader spanstore.Reader
spanWriter spanstore.Writer
archiveReader spanstore.Reader
archiveWriter spanstore.Writer
capabilities shared.PluginCapabilities
dependencyReader dependencystore.Reader
spanReader spanstore.Reader
spanWriter spanstore.Writer
archiveReader spanstore.Reader
archiveWriter spanstore.Writer
streamingSpanWriter spanstore.Writer
capabilities shared.PluginCapabilities
dependencyReader dependencystore.Reader
}

func (mp *mockPlugin) Capabilities() (*shared.Capabilities, error) {
Expand All @@ -93,6 +98,10 @@ func (mp *mockPlugin) SpanWriter() spanstore.Writer {
return mp.spanWriter
}

func (mp *mockPlugin) StreamingSpanWriter() spanstore.Writer {
return mp.streamingSpanWriter
}

func (mp *mockPlugin) DependencyReader() dependencystore.Reader {
return mp.dependencyReader
}
Expand Down Expand Up @@ -143,16 +152,19 @@ func TestGRPCStorageFactory_Capabilities(t *testing.T) {
capabilities := new(mocks.PluginCapabilities)
capabilities.On("Capabilities").
Return(&shared.Capabilities{
ArchiveSpanReader: true,
ArchiveSpanWriter: true,
}, nil)
ArchiveSpanReader: true,
ArchiveSpanWriter: true,
StreamingSpanWriter: true,
}, nil).Times(3)

f.builder = &mockPluginBuilder{
plugin: &mockPlugin{
capabilities: capabilities,
archiveWriter: new(spanStoreMocks.Writer),
archiveReader: new(spanStoreMocks.Reader),
capabilities: capabilities,
archiveWriter: new(spanStoreMocks.Writer),
archiveReader: new(spanStoreMocks.Reader),
streamingSpanWriter: new(spanStoreMocks.Writer),
},
writerType: "streaming",
}
assert.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop()))

Expand All @@ -163,6 +175,9 @@ func TestGRPCStorageFactory_Capabilities(t *testing.T) {
writer, err := f.CreateArchiveSpanWriter()
assert.NoError(t, err)
assert.NotNil(t, writer)
writer, err = f.CreateSpanWriter()
assert.NoError(t, err)
assert.Equal(t, f.streamingSpanWriter.StreamingSpanWriter(), writer)
}

func TestGRPCStorageFactory_CapabilitiesDisabled(t *testing.T) {
Expand All @@ -173,15 +188,17 @@ func TestGRPCStorageFactory_CapabilitiesDisabled(t *testing.T) {
capabilities := new(mocks.PluginCapabilities)
capabilities.On("Capabilities").
Return(&shared.Capabilities{
ArchiveSpanReader: false,
ArchiveSpanWriter: false,
ArchiveSpanReader: false,
ArchiveSpanWriter: false,
StreamingSpanWriter: false,
}, nil)

f.builder = &mockPluginBuilder{
plugin: &mockPlugin{
capabilities: capabilities,
archiveWriter: new(spanStoreMocks.Writer),
archiveReader: new(spanStoreMocks.Reader),
spanWriter: new(spanStoreMocks.Writer),
},
}
assert.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop()))
Expand All @@ -193,6 +210,9 @@ func TestGRPCStorageFactory_CapabilitiesDisabled(t *testing.T) {
writer, err := f.CreateArchiveSpanWriter()
assert.EqualError(t, err, storage.ErrArchiveStorageNotSupported.Error())
assert.Nil(t, writer)
writer, err = f.CreateSpanWriter()
assert.NoError(t, err)
assert.Equal(t, f.store.SpanWriter(), writer)
}

func TestGRPCStorageFactory_CapabilitiesError(t *testing.T) {
Expand All @@ -210,6 +230,7 @@ func TestGRPCStorageFactory_CapabilitiesError(t *testing.T) {
capabilities: capabilities,
archiveWriter: new(spanStoreMocks.Writer),
archiveReader: new(spanStoreMocks.Reader),
spanWriter: new(spanStoreMocks.Writer),
},
}
assert.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop()))
Expand All @@ -221,6 +242,9 @@ func TestGRPCStorageFactory_CapabilitiesError(t *testing.T) {
writer, err := f.CreateArchiveSpanWriter()
assert.EqualError(t, err, customError.Error())
assert.Nil(t, writer)
writer, err = f.CreateSpanWriter()
assert.NoError(t, err)
assert.Equal(t, f.store.SpanWriter(), writer)
}

func TestGRPCStorageFactory_CapabilitiesNil(t *testing.T) {
Expand All @@ -232,6 +256,7 @@ func TestGRPCStorageFactory_CapabilitiesNil(t *testing.T) {
plugin: &mockPlugin{
archiveWriter: new(spanStoreMocks.Writer),
archiveReader: new(spanStoreMocks.Reader),
spanWriter: new(spanStoreMocks.Writer),
},
}
assert.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop()))
Expand All @@ -243,6 +268,9 @@ func TestGRPCStorageFactory_CapabilitiesNil(t *testing.T) {
writer, err := f.CreateArchiveSpanWriter()
assert.Equal(t, err, storage.ErrArchiveStorageNotSupported)
assert.Nil(t, writer)
writer, err = f.CreateSpanWriter()
assert.NoError(t, err)
assert.Equal(t, f.store.SpanWriter(), writer)
}

func TestWithConfiguration(t *testing.T) {
Expand All @@ -252,12 +280,14 @@ func TestWithConfiguration(t *testing.T) {
"--grpc-storage-plugin.log-level=debug",
"--grpc-storage-plugin.binary=noop-grpc-plugin",
"--grpc-storage-plugin.configuration-file=config.json",
"--grpc-storage-plugin.writer-type=streaming",
})
assert.NoError(t, err)
f.InitFromViper(v, zap.NewNop())
assert.Equal(t, f.options.Configuration.PluginBinary, "noop-grpc-plugin")
assert.Equal(t, f.options.Configuration.PluginConfigurationFile, "config.json")
assert.Equal(t, f.options.Configuration.PluginLogLevel, "debug")
assert.Equal(t, f.options.Configuration.PluginWriterType, "streaming")
assert.NoError(t, f.Close())
}

Expand All @@ -272,3 +302,48 @@ func TestInitFromOptions(t *testing.T) {
assert.Equal(t, o, f.options)
assert.Equal(t, &o.Configuration, f.builder)
}

func TestStreamingSpanWriterFactory_CapabilitiesNil(t *testing.T) {
f := NewFactory()
v := viper.New()
f.InitFromViper(v, zap.NewNop())

f.builder = &mockPluginBuilder{
plugin: &mockPlugin{
archiveWriter: new(spanStoreMocks.Writer),
archiveReader: new(spanStoreMocks.Reader),
spanWriter: new(spanStoreMocks.Writer),
},
writerType: "streaming",
}
assert.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop()))
_, err := f.CreateSpanWriter()
assert.ErrorContains(t, err, "not supported")
}

func TestStreamingSpanWriterFactory_Capabilities(t *testing.T) {
f := NewFactory()
v := viper.New()
f.InitFromViper(v, zap.NewNop())

capabilities := new(mocks.PluginCapabilities)
customError := errors.New("made-up error")
capabilities.On("Capabilities").
Return(nil, customError).Once().
On("Capabilities").Return(&shared.Capabilities{}, nil).Once()

f.builder = &mockPluginBuilder{
plugin: &mockPlugin{
archiveWriter: new(spanStoreMocks.Writer),
archiveReader: new(spanStoreMocks.Reader),
spanWriter: new(spanStoreMocks.Writer),
capabilities: capabilities,
},
writerType: "streaming",
}
assert.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop()))
_, err := f.CreateSpanWriter()
assert.ErrorContains(t, err, "not supported")
_, err = f.CreateSpanWriter()
assert.ErrorContains(t, err, "not supported")
}
1 change: 1 addition & 0 deletions plugin/storage/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func ServeWithGRPCServer(services *shared.PluginServices, grpcServer func([]grpc
shared.StoragePluginIdentifier: &shared.StorageGRPCPlugin{
Impl: services.Store,
ArchiveImpl: services.ArchiveStore,
StreamImpl: services.StreamingSpanWriter,
},
},
},
Expand Down
4 changes: 4 additions & 0 deletions plugin/storage/grpc/memory/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ func (ns *storagePlugin) SpanWriter() spanstore.Writer {
return ns.store
}

func (ns *storagePlugin) StreamingSpanWriter() spanstore.Writer {
return ns.store
}

func (ns *storagePlugin) ArchiveSpanReader() spanstore.Reader {
return ns.archiveStore
}
Expand Down
1 change: 1 addition & 0 deletions plugin/storage/grpc/memory/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func TestPluginUsesMemoryStorage(t *testing.T) {
assert.Equal(t, mainStorage, memStorePlugin.DependencyReader())
assert.Equal(t, mainStorage, memStorePlugin.SpanReader())
assert.Equal(t, mainStorage, memStorePlugin.SpanWriter())
assert.Equal(t, mainStorage, memStorePlugin.StreamingSpanWriter())
assert.Equal(t, archiveStorage, memStorePlugin.ArchiveSpanReader())
assert.Equal(t, archiveStorage, memStorePlugin.ArchiveSpanWriter())

Expand Down
4 changes: 4 additions & 0 deletions plugin/storage/grpc/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@ const (
pluginBinary = "grpc-storage-plugin.binary"
pluginConfigurationFile = "grpc-storage-plugin.configuration-file"
pluginLogLevel = "grpc-storage-plugin.log-level"
pluginWriterType = "grpc-storage-plugin.writer-type"
remotePrefix = "grpc-storage"
remoteServer = remotePrefix + ".server"
remoteConnectionTimeout = remotePrefix + ".connection-timeout"
defaultPluginLogLevel = "warn"
defaultWriterType = "unary"
defaultConnectionTimeout = time.Duration(5 * time.Second)
)

Expand All @@ -55,6 +57,7 @@ func (opt *Options) AddFlags(flagSet *flag.FlagSet) {
flagSet.String(pluginBinary, "", "The location of the plugin binary")
flagSet.String(pluginConfigurationFile, "", "A path pointing to the plugin's configuration file, made available to the plugin with the --config arg")
flagSet.String(pluginLogLevel, defaultPluginLogLevel, "Set the log level of the plugin's logger")
flagSet.String(pluginWriterType, defaultWriterType, "The plugin span writer's grpc rpc type")
flagSet.String(remoteServer, "", "The remote storage gRPC server address")
flagSet.Duration(remoteConnectionTimeout, defaultConnectionTimeout, "The remote storage gRPC server connection timeout")

Expand All @@ -65,6 +68,7 @@ func (opt *Options) InitFromViper(v *viper.Viper) error {
opt.Configuration.PluginBinary = v.GetString(pluginBinary)
opt.Configuration.PluginConfigurationFile = v.GetString(pluginConfigurationFile)
opt.Configuration.PluginLogLevel = v.GetString(pluginLogLevel)
opt.Configuration.PluginWriterType = v.GetString(pluginWriterType)
opt.Configuration.RemoteServerAddr = v.GetString(remoteServer)
var err error
opt.Configuration.RemoteTLS, err = tlsFlagsConfig().InitFromViper(v)
Expand Down
2 changes: 2 additions & 0 deletions plugin/storage/grpc/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,15 @@ func TestOptionsWithFlags(t *testing.T) {
"--grpc-storage-plugin.binary=noop-grpc-plugin",
"--grpc-storage-plugin.configuration-file=config.json",
"--grpc-storage-plugin.log-level=debug",
"--grpc-storage-plugin.writer-type=streaming",
})
assert.NoError(t, err)
opts.InitFromViper(v)

assert.Equal(t, opts.Configuration.PluginBinary, "noop-grpc-plugin")
assert.Equal(t, opts.Configuration.PluginConfigurationFile, "config.json")
assert.Equal(t, opts.Configuration.PluginLogLevel, "debug")
assert.Equal(t, opts.Configuration.PluginWriterType, "streaming")
}

func TestRemoteOptionsWithFlags(t *testing.T) {
Expand Down
Loading

0 comments on commit 9e555df

Please sign in to comment.