Skip to content

Commit

Permalink
Wire up lock/samplingstore
Browse files Browse the repository at this point in the history
Signed-off-by: Joe Elliott <[email protected]>
  • Loading branch information
joe-elliott committed Apr 28, 2021
1 parent 42aedd3 commit 8113686
Show file tree
Hide file tree
Showing 21 changed files with 219 additions and 25 deletions.
6 changes: 5 additions & 1 deletion cmd/all-in-one/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,13 @@ by default uses only in-memory database.`,
if err != nil {
logger.Fatal("Failed to create dependency reader", zap.Error(err))
}
lock, samplingStore, err := storageFactory.CreateLockAndSamplingStore()
if err != nil {
logger.Fatal("Failed to create lock and sampling store for adaptive sampling", zap.Error(err))
}

strategyStoreFactory.InitFromViper(v)
if err := strategyStoreFactory.Initialize(metricsFactory, logger); err != nil {
if err := strategyStoreFactory.Initialize(metricsFactory, logger, lock, samplingStore); err != nil {
logger.Fatal("Failed to init sampling strategy store factory", zap.Error(err))
}
strategyStore, err := strategyStoreFactory.CreateStrategyStore()
Expand Down
4 changes: 3 additions & 1 deletion cmd/collector/app/sampling/strategystore/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package strategystore

import (
"github.com/jaegertracing/jaeger/pkg/distributedlock"
"github.com/jaegertracing/jaeger/storage/samplingstore"
"github.com/uber/jaeger-lib/metrics"
"go.uber.org/zap"
)
Expand All @@ -27,7 +29,7 @@ import (
// plugin.Configurable
type Factory interface {
// Initialize performs internal initialization of the factory.
Initialize(metricsFactory metrics.Factory, logger *zap.Logger) error
Initialize(metricsFactory metrics.Factory, logger *zap.Logger, lock distributedlock.Lock, store samplingstore.Store) error

// CreateStrategyStore initializes the StrategyStore and returns it.
CreateStrategyStore() (StrategyStore, error)
Expand Down
6 changes: 5 additions & 1 deletion cmd/collector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,13 @@ func main() {
if err != nil {
logger.Fatal("Failed to create span writer", zap.Error(err))
}
lock, samplingStore, err := storageFactory.CreateLockAndSamplingStore()
if err != nil {
logger.Fatal("Failed to create lock and sampling store for adaptive sampling", zap.Error(err))
}

strategyStoreFactory.InitFromViper(v)
if err := strategyStoreFactory.Initialize(metricsFactory, logger); err != nil {
if err := strategyStoreFactory.Initialize(metricsFactory, logger, lock, samplingStore); err != nil {
logger.Fatal("Failed to init sampling strategy store factory", zap.Error(err))
}
strategyStore, err := strategyStoreFactory.CreateStrategyStore()
Expand Down
5 changes: 5 additions & 0 deletions cmd/query/app/querysvc/query_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ import (

"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/model/adjuster"
"github.com/jaegertracing/jaeger/pkg/distributedlock"
"github.com/jaegertracing/jaeger/storage"
"github.com/jaegertracing/jaeger/storage/dependencystore"
depsmocks "github.com/jaegertracing/jaeger/storage/dependencystore/mocks"
"github.com/jaegertracing/jaeger/storage/samplingstore"
"github.com/jaegertracing/jaeger/storage/spanstore"
spanstoremocks "github.com/jaegertracing/jaeger/storage/spanstore/mocks"
)
Expand Down Expand Up @@ -281,6 +283,9 @@ func (*fakeStorageFactory1) Initialize(metricsFactory metrics.Factory, logger *z
func (*fakeStorageFactory1) CreateSpanReader() (spanstore.Reader, error) { return nil, nil }
func (*fakeStorageFactory1) CreateSpanWriter() (spanstore.Writer, error) { return nil, nil }
func (*fakeStorageFactory1) CreateDependencyReader() (dependencystore.Reader, error) { return nil, nil }
func (*fakeStorageFactory1) CreateLockAndSamplingStore() (distributedlock.Lock, samplingstore.Store, error) {
return nil, nil, nil
}

func (f *fakeStorageFactory2) CreateArchiveSpanReader() (spanstore.Reader, error) { return f.r, f.rErr }
func (f *fakeStorageFactory2) CreateArchiveSpanWriter() (spanstore.Writer, error) { return f.w, f.wErr }
Expand Down
17 changes: 15 additions & 2 deletions plugin/sampling/strategystore/adaptive/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,25 @@
package adaptive

import (
"errors"
"flag"

"github.com/spf13/viper"
"github.com/uber/jaeger-lib/metrics"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/collector/app/sampling/strategystore"
"github.com/jaegertracing/jaeger/pkg/distributedlock"
"github.com/jaegertracing/jaeger/storage/samplingstore"
)

// Factory implements strategystore.Factory for an adaptive strategy store.
type Factory struct {
options *Options
logger *zap.Logger
metricsFactory metrics.Factory
lock distributedlock.Lock
store samplingstore.Store
}

// NewFactory creates a new Factory.
Expand All @@ -37,6 +42,8 @@ func NewFactory() *Factory {
options: &Options{},
logger: zap.NewNop(),
metricsFactory: metrics.NullFactory,
lock: nil,
store: nil,
}
}

Expand All @@ -51,15 +58,21 @@ func (f *Factory) InitFromViper(v *viper.Viper) {
}

// Initialize implements strategystore.Factory
func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) error {
func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger, lock distributedlock.Lock, store samplingstore.Store) error {
if lock == nil || store == nil {
return errors.New("lock or samplingStore nil. adaptive sampling only supported with Cassandra backend") // todo(jpe): better check/error msg
}

f.logger = logger
f.metricsFactory = metricsFactory
f.lock = lock
f.store = store
return nil
}

// CreateStrategyStore implements strategystore.Factory
func (f *Factory) CreateStrategyStore() (strategystore.StrategyStore, error) {
p, err := NewStrategyStore(*f.options, f.logger)
p, err := NewStrategyStore(*f.options, f.metricsFactory, f.logger, f.lock, f.store)
if err != nil {
return nil, err
}
Expand Down
31 changes: 30 additions & 1 deletion plugin/sampling/strategystore/adaptive/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/uber/jaeger-lib/metrics"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/collector/app/sampling/model"
ss "github.com/jaegertracing/jaeger/cmd/collector/app/sampling/strategystore"
"github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/plugin"
Expand Down Expand Up @@ -61,7 +62,35 @@ func TestFactory(t *testing.T) {
assert.Equal(t, time.Second, f.options.LeaderLeaseRefreshInterval)
assert.Equal(t, time.Second*2, f.options.FollowerLeaseRefreshInterval)

assert.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop()))
assert.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop(), &mockLock{}, &mockStore{}))
_, err := f.CreateStrategyStore()
assert.NoError(t, err)
}

type mockStore struct{}

func (m *mockStore) InsertThroughput(throughput []*model.Throughput) error {
return nil
}
func (m *mockStore) InsertProbabilitiesAndQPS(hostname string, probabilities model.ServiceOperationProbabilities, qps model.ServiceOperationQPS) error {
return nil
}
func (m *mockStore) GetThroughput(start, end time.Time) ([]*model.Throughput, error) {
return nil, nil
}
func (m *mockStore) GetProbabilitiesAndQPS(start, end time.Time) (map[string][]model.ServiceOperationData, error) {
return nil, nil
}
func (m *mockStore) GetLatestProbabilities() (model.ServiceOperationProbabilities, error) {
return nil, nil
}

type mockLock struct{}

func (m *mockLock) Acquire(resource string, ttl time.Duration) (acquired bool, err error) {
return true, nil
}

func (m *mockLock) Forfeit(resource string) (forfeited bool, err error) {
return true, nil
}
12 changes: 6 additions & 6 deletions plugin/sampling/strategystore/adaptive/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/collector/app/sampling/model"
"github.com/jaegertracing/jaeger/pkg/distributedlock"
"github.com/jaegertracing/jaeger/plugin/sampling/calculationstrategy"
"github.com/jaegertracing/jaeger/plugin/sampling/leaderelection"
"github.com/jaegertracing/jaeger/storage/samplingstore"
Expand All @@ -43,6 +44,8 @@ const (

// The number of past entries for samplingCache the leader keeps in memory
serviceCacheSize = 25

defaultResourceName = "sampling_store_leader"
)

var (
Expand Down Expand Up @@ -521,7 +524,6 @@ func (p *processor) generateDefaultSamplingStrategyResponse() *sampling.Sampling
type strategyStore struct {
logger *zap.Logger
processor *processor
ctx context.Context
cancelFunc context.CancelFunc
}

Expand All @@ -541,16 +543,14 @@ func (h *strategyStore) GetSamplingStrategy(_ context.Context, serviceName strin
}

// NewStrategyStore creates a strategy store that holds adaptive sampling strategies.
func NewStrategyStore(options Options, logger *zap.Logger) (*processor, error) {
// ctx, _ := context.WithCancel(context.Background())

func NewStrategyStore(options Options, metricsFactory metrics.Factory, logger *zap.Logger, lock distributedlock.Lock, store samplingstore.Store) (*processor, error) {
hostname, err := os.Hostname()
if err != nil {
return nil, err
}

// How to initialize storage, electionParticipant, metricsFactory for the NewProcessor ?
p, err := NewProcessor(options, hostname, nil, nil, nil, logger)
participant := leaderelection.NewElectionParticipant(lock, defaultResourceName, leaderelection.ElectionParticipantOptions{}) // todo(jpe) : wire up options/resource name
p, err := NewProcessor(options, hostname, store, participant, metricsFactory, logger)
if err != nil {
return nil, err
}
Expand Down
6 changes: 4 additions & 2 deletions plugin/sampling/strategystore/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ import (
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/collector/app/sampling/strategystore"
"github.com/jaegertracing/jaeger/pkg/distributedlock"
"github.com/jaegertracing/jaeger/plugin"
"github.com/jaegertracing/jaeger/plugin/sampling/strategystore/adaptive"
"github.com/jaegertracing/jaeger/plugin/sampling/strategystore/static"
"github.com/jaegertracing/jaeger/storage/samplingstore"
)

type StrategyStoreType string
Expand Down Expand Up @@ -86,9 +88,9 @@ func (f *Factory) InitFromViper(v *viper.Viper) {
}

// Initialize implements strategystore.Factory
func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) error {
func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger, lock distributedlock.Lock, store samplingstore.Store) error {
for _, factory := range f.factories {
if err := factory.Initialize(metricsFactory, logger); err != nil {
if err := factory.Initialize(metricsFactory, logger, lock, store); err != nil {
return err
}
}
Expand Down
41 changes: 38 additions & 3 deletions plugin/sampling/strategystore/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,19 @@ import (
"flag"
"os"
"testing"
"time"

"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/uber/jaeger-lib/metrics"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/collector/app/sampling/model"
ss "github.com/jaegertracing/jaeger/cmd/collector/app/sampling/strategystore"
"github.com/jaegertracing/jaeger/pkg/distributedlock"
"github.com/jaegertracing/jaeger/plugin"
"github.com/jaegertracing/jaeger/storage/samplingstore"
)

func clearEnv() {
Expand All @@ -48,13 +52,16 @@ func TestNewFactory(t *testing.T) {
mock := new(mockFactory)
f.factories["static"] = mock

assert.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop()))
lock := &mockLock{}
store := &mockStore{}

assert.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop(), lock, store))
_, err = f.CreateStrategyStore()
assert.NoError(t, err)

// force the mock to return errors
mock.retError = true
assert.EqualError(t, f.Initialize(metrics.NullFactory, zap.NewNop()), "error initializing store")
assert.EqualError(t, f.Initialize(metrics.NullFactory, zap.NewNop(), lock, store), "error initializing store")
_, err = f.CreateStrategyStore()
assert.EqualError(t, err, "error creating store")

Expand Down Expand Up @@ -110,9 +117,37 @@ func (f *mockFactory) CreateStrategyStore() (ss.StrategyStore, error) {
return nil, nil
}

func (f *mockFactory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) error {
func (f *mockFactory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger, lock distributedlock.Lock, store samplingstore.Store) error {
if f.retError {
return errors.New("error initializing store")
}
return nil
}

type mockStore struct{}

func (m *mockStore) InsertThroughput(throughput []*model.Throughput) error {
return nil
}
func (m *mockStore) InsertProbabilitiesAndQPS(hostname string, probabilities model.ServiceOperationProbabilities, qps model.ServiceOperationQPS) error {
return nil
}
func (m *mockStore) GetThroughput(start, end time.Time) ([]*model.Throughput, error) {
return nil, nil
}
func (m *mockStore) GetProbabilitiesAndQPS(start, end time.Time) (map[string][]model.ServiceOperationData, error) {
return nil, nil
}
func (m *mockStore) GetLatestProbabilities() (model.ServiceOperationProbabilities, error) {
return nil, nil
}

type mockLock struct{}

func (m *mockLock) Acquire(resource string, ttl time.Duration) (acquired bool, err error) {
return true, nil
}

func (m *mockLock) Forfeit(resource string) (forfeited bool, err error) {
return true, nil
}
4 changes: 3 additions & 1 deletion plugin/sampling/strategystore/static/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/collector/app/sampling/strategystore"
"github.com/jaegertracing/jaeger/pkg/distributedlock"
"github.com/jaegertracing/jaeger/storage/samplingstore"
)

// Factory implements strategystore.Factory for a static strategy store.
Expand Down Expand Up @@ -49,7 +51,7 @@ func (f *Factory) InitFromViper(v *viper.Viper) {
}

// Initialize implements strategystore.Factory
func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) error {
func (f *Factory) Initialize(_ metrics.Factory, logger *zap.Logger, _ distributedlock.Lock, _ samplingstore.Store) error {
f.logger = logger
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion plugin/sampling/strategystore/static/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func TestFactory(t *testing.T) {
command.ParseFlags([]string{"--sampling.strategies-file=fixtures/strategies.json"})
f.InitFromViper(v)

assert.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop()))
assert.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop(), nil, nil))
_, err := f.CreateStrategyStore()
assert.NoError(t, err)
}
8 changes: 8 additions & 0 deletions plugin/storage/badger/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,12 @@ import (
"github.com/uber/jaeger-lib/metrics"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/pkg/distributedlock"
depStore "github.com/jaegertracing/jaeger/plugin/storage/badger/dependencystore"
badgerStore "github.com/jaegertracing/jaeger/plugin/storage/badger/spanstore"
"github.com/jaegertracing/jaeger/storage"
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/samplingstore"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

Expand Down Expand Up @@ -167,6 +170,11 @@ func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) {
return depStore.NewDependencyStore(sr), nil
}

// CreateLockAndSamplingStore implements storage.ArchiveFactory. It is not supported by this storage plugin.
func (f *Factory) CreateLockAndSamplingStore() (distributedlock.Lock, samplingstore.Store, error) {
return nil, nil, storage.ErrLockAndSamplingStoreNotSupported
}

// Close Implements io.Closer and closes the underlying storage
func (f *Factory) Close() error {
close(f.maintenanceDone)
Expand Down
Loading

0 comments on commit 8113686

Please sign in to comment.