Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

YAAAAS - Yet Another Attempt At Adaptive Sampling #2966

Merged
merged 50 commits into from
Sep 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
5eb2eca
Original PR
joe-elliott Apr 27, 2021
42aedd3
cleaned up tests and config factory logic
joe-elliott Apr 27, 2021
8113686
Wire up lock/samplingstore
joe-elliott Apr 28, 2021
5130f11
changelog
joe-elliott Apr 28, 2021
418ee39
lint
joe-elliott May 4, 2021
3dbe31a
Added method for conditionally instantiating the lock and sampling store
joe-elliott May 4, 2021
b852076
Additional tests
joe-elliott May 4, 2021
160f38a
Merge branch 'master' into adaptive-sampling
joe-elliott May 12, 2021
006dbbf
Passed appropriate settings and added override hostname
joe-elliott May 12, 2021
922eb56
pass hostname to lock
joe-elliott May 17, 2021
7c90815
First pass cassandra tables
joe-elliott May 17, 2021
9c1e181
schema cleanup. start electionParticipant
joe-elliott May 17, 2021
1c08ab9
revert cqlsh path
joe-elliott May 17, 2021
1adb5a0
Merge branch 'master' into adaptive-sampling
joe-elliott May 17, 2021
cf2338c
compose updates
joe-elliott Jun 9, 2021
00da894
Merge branch 'master' into adaptive-sampling
joe-elliott Jun 9, 2021
33fd164
Merge branch 'master' into adaptive-sampling
joe-elliott Jun 14, 2021
044a5be
Pass in processspan
joe-elliott Jun 15, 2021
a0bb1c0
Rearranged hotrod startup to allow env override
joe-elliott Jun 15, 2021
2635b43
call aggregator.Stop()
joe-elliott Jun 15, 2021
6f44e1d
Made aggregator make sense
joe-elliott Jun 15, 2021
0629d93
Added additional processors test
joe-elliott Jun 15, 2021
cc478ce
Added CreateLockAndSamplingStore tests
joe-elliott Jun 15, 2021
52359a1
Merge branch 'master' into adaptive-sampling
joe-elliott Jun 22, 2021
8524c9d
remove overrideHostname in favor of a short random postfix
joe-elliott Jun 25, 2021
b72f470
added hostname
joe-elliott Jun 25, 2021
bfe236d
Merge branch 'master' into adaptive-sampling
joe-elliott Jul 6, 2021
4570cb8
Update storage/factory.go
joe-elliott Jul 6, 2021
c1e6b04
lint really wants crypto/rand
joe-elliott Jul 6, 2021
f10304c
logger to last param
joe-elliott Jul 6, 2021
c0b0b3a
io.Closer
joe-elliott Jul 6, 2021
3701832
log error on aggregator close
joe-elliott Jul 6, 2021
51a6afb
Added mocks to test that the right values are returned
joe-elliott Jul 6, 2021
73dfc99
Improved unique name logging
joe-elliott Jul 6, 2021
469b235
make fmt
joe-elliott Jul 6, 2021
1d54042
Additional testing
joe-elliott Jul 14, 2021
fe31e5a
lint
joe-elliott Jul 14, 2021
ceeff9a
fix test
joe-elliott Jul 14, 2021
91f288e
Merge branch 'master' into adaptive-sampling
jpkrohling Jul 15, 2021
e1e1ea3
tests
joe-elliott Jul 15, 2021
7364871
getParam => samplerParamToFloat
joe-elliott Jul 15, 2021
ff45545
Removed requirelockandsamplingstore
joe-elliott Jul 15, 2021
027faf1
use constants for constants
joe-elliott Jul 20, 2021
e185305
Split out sampling store interface
joe-elliott Jul 20, 2021
7fcacad
lint
joe-elliott Jul 20, 2021
ffb4f84
tests
joe-elliott Jul 20, 2021
c54ac2a
Merge branch 'master' into adaptive-sampling
joe-elliott Aug 18, 2021
9a69318
Merge branch 'master' into adaptive-sampling
yurishkuro Sep 6, 2021
8025133
Merge branch 'master' into adaptive-sampling
yurishkuro Sep 8, 2021
86f42d8
Move changelog entry
yurishkuro Sep 8, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
Changes by Version
==================

next release
-------------------
### Backend Changes
#### New Features
* Add support for adaptive sampling with a Cassandra backend. ([#2966](https://github.com/jaegertracing/jaeger/pull/2966), [@joe-elliott](https://github.com/joe-elliott))


1.26.0 (2021-09-06)
-------------------
### Backend Changes
Expand Down Expand Up @@ -70,6 +77,8 @@ Changes by Version
-------------------
### Backend Changes

#### New Features

#### Breaking Changes

* Remove unused `--es-archive.max-span-age` flag ([#2865](https://github.com/jaegertracing/jaeger/pull/2865), [@albertteoh](https://github.com/albertteoh)):
Expand Down
16 changes: 13 additions & 3 deletions cmd/all-in-one/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,11 @@ func main() {
if err != nil {
log.Fatalf("Cannot initialize storage factory: %v", err)
}
strategyStoreFactory, err := ss.NewFactory(ss.FactoryConfigFromEnv())
strategyStoreFactoryConfig, err := ss.FactoryConfigFromEnv()
if err != nil {
log.Fatalf("Cannot initialize sampling strategy store factory config: %v", err)
}
strategyStoreFactory, err := ss.NewFactory(*strategyStoreFactoryConfig)
if err != nil {
log.Fatalf("Cannot initialize sampling strategy store factory: %v", err)
}
Expand Down Expand Up @@ -121,11 +125,16 @@ by default uses only in-memory database.`,
logger.Fatal("Failed to create metrics reader", zap.Error(err))
}

ssFactory, err := storageFactory.CreateSamplingStoreFactory()
if err != nil {
logger.Fatal("Failed to create sampling store factory", zap.Error(err))
}

strategyStoreFactory.InitFromViper(v, logger)
if err := strategyStoreFactory.Initialize(metricsFactory, logger); err != nil {
if err := strategyStoreFactory.Initialize(metricsFactory, ssFactory, logger); err != nil {
logger.Fatal("Failed to init sampling strategy store factory", zap.Error(err))
}
strategyStore, err := strategyStoreFactory.CreateStrategyStore()
strategyStore, aggregator, err := strategyStoreFactory.CreateStrategyStore()
if err != nil {
logger.Fatal("Failed to create sampling strategy store", zap.Error(err))
}
Expand All @@ -143,6 +152,7 @@ by default uses only in-memory database.`,
MetricsFactory: metricsFactory,
SpanWriter: spanWriter,
StrategyStore: strategyStore,
Aggregator: aggregator,
HealthCheck: svc.HC(),
})
if err := c.Start(cOpts); err != nil {
Expand Down
17 changes: 16 additions & 1 deletion cmd/collector/app/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type Collector struct {
metricsFactory metrics.Factory
spanWriter spanstore.Writer
strategyStore strategystore.StrategyStore
aggregator strategystore.Aggregator
hCheck *healthcheck.HealthCheck
spanProcessor processor.SpanProcessor
spanHandlers *SpanHandlers
Expand All @@ -59,6 +60,7 @@ type CollectorParams struct {
MetricsFactory metrics.Factory
SpanWriter spanstore.Writer
StrategyStore strategystore.StrategyStore
Aggregator strategystore.Aggregator
HealthCheck *healthcheck.HealthCheck
}

Expand All @@ -70,6 +72,7 @@ func New(params *CollectorParams) *Collector {
metricsFactory: params.MetricsFactory,
spanWriter: params.SpanWriter,
strategyStore: params.StrategyStore,
aggregator: params.Aggregator,
hCheck: params.HealthCheck,
}
}
Expand All @@ -83,7 +86,12 @@ func (c *Collector) Start(builderOpts *CollectorOptions) error {
MetricsFactory: c.metricsFactory,
}

c.spanProcessor = handlerBuilder.BuildSpanProcessor()
var additionalProcessors []ProcessSpan
if c.aggregator != nil {
additionalProcessors = append(additionalProcessors, handleRootSpan(c.aggregator, c.logger))
}

c.spanProcessor = handlerBuilder.BuildSpanProcessor(additionalProcessors...)
c.spanHandlers = handlerBuilder.BuildHandlers(c.spanProcessor)

grpcServer, err := server.StartGRPCServer(&server.GRPCServerParams{
Expand Down Expand Up @@ -169,6 +177,13 @@ func (c *Collector) Close() error {
c.logger.Error("failed to close span processor.", zap.Error(err))
}

// aggregator does not exist for all strategy stores. only Close() if exists.
if c.aggregator != nil {
if err := c.aggregator.Close(); err != nil {
c.logger.Error("failed to close aggregator.", zap.Error(err))
}
}

// watchers actually never return errors from Close
_ = c.tlsGRPCCertWatcherCloser.Close()
_ = c.tlsHTTPCertWatcherCloser.Close()
Expand Down
59 changes: 59 additions & 0 deletions cmd/collector/app/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"github.com/uber/jaeger-lib/metrics/metricstest"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/collector/app/processor"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/healthcheck"
"github.com/jaegertracing/jaeger/thrift-gen/sampling"
)
Expand Down Expand Up @@ -98,3 +100,60 @@ func TestCollector_PublishOpts(t *testing.T) {
Value: 42,
})
}

func TestAggregator(t *testing.T) {
// prepare
hc := healthcheck.New()
logger := zap.NewNop()
baseMetrics := metricstest.NewFactory(time.Hour)
spanWriter := &fakeSpanWriter{}
strategyStore := &mockStrategyStore{}
agg := &mockAggregator{}

c := New(&CollectorParams{
ServiceName: "collector",
Logger: logger,
MetricsFactory: baseMetrics,
SpanWriter: spanWriter,
StrategyStore: strategyStore,
HealthCheck: hc,
Aggregator: agg,
})
collectorOpts := &CollectorOptions{
QueueSize: 10,
NumWorkers: 10,
}

// test
c.Start(collectorOpts)

// assert that aggregator was added to the collector
_, err := c.spanProcessor.ProcessSpans([]*model.Span{
{
OperationName: "y",
Process: &model.Process{
ServiceName: "x",
},
Tags: []model.KeyValue{
{
Key: "sampler.type",
VStr: "probabilistic",
},
{
Key: "sampler.param",
VStr: "1",
},
},
},
}, processor.SpansOptions{SpanFormat: processor.JaegerSpanFormat})
assert.NoError(t, err)

// verify
assert.NoError(t, c.Close())

// assert that aggregator was used
assert.Equal(t, 1, agg.callCount)

// assert that aggregator close was called
assert.Equal(t, 1, agg.closeCount)
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,17 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package adaptive
package app

import (
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/collector/app"
"github.com/jaegertracing/jaeger/cmd/collector/app/sampling/strategystore"
"github.com/jaegertracing/jaeger/model"
)

// HandleRootSpan returns a function that records throughput for root spans
func HandleRootSpan(aggregator Aggregator, logger *zap.Logger) app.ProcessSpan {
// handleRootSpan returns a function that records throughput for root spans
func handleRootSpan(aggregator strategystore.Aggregator, logger *zap.Logger) ProcessSpan {
return func(span *model.Span) {
// TODO simply checking parentId to determine if a span is a root span is not sufficient. However,
// we can be sure that only a root span will have sampler tags.
Expand All @@ -33,7 +33,7 @@ func HandleRootSpan(aggregator Aggregator, logger *zap.Logger) app.ProcessSpan {
if service == "" || span.OperationName == "" {
return
}
samplerType, samplerParam := GetSamplerParams(span, logger)
samplerType, samplerParam := span.GetSamplerParams(logger)
if samplerType == "" {
return
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package adaptive
package app

import (
"testing"
Expand All @@ -24,18 +24,22 @@ import (
)

type mockAggregator struct {
callCount int
callCount int
closeCount int
}

func (t *mockAggregator) RecordThroughput(service, operation, samplerType string, probability float64) {
t.callCount++
}
func (t *mockAggregator) Start() {}
func (t *mockAggregator) Stop() {}
func (t *mockAggregator) Close() error {
t.closeCount++
return nil
}

func TestHandleRootSpan(t *testing.T) {
aggregator := &mockAggregator{}
processor := HandleRootSpan(aggregator, zap.NewNop())
processor := handleRootSpan(aggregator, zap.NewNop())

// Testing non-root span
span := &model.Span{References: []model.SpanRef{{SpanID: model.NewSpanID(1), RefType: model.ChildOf}}}
Expand Down
6 changes: 4 additions & 2 deletions cmd/collector/app/sampling/strategystore/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package strategystore
import (
"github.com/uber/jaeger-lib/metrics"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/storage"
)

// Factory defines an interface for a factory that can create implementations of different strategy storage components.
Expand All @@ -27,8 +29,8 @@ import (
// plugin.Configurable
type Factory interface {
// Initialize performs internal initialization of the factory.
Initialize(metricsFactory metrics.Factory, logger *zap.Logger) error
Initialize(metricsFactory metrics.Factory, ssFactory storage.SamplingStoreFactory, logger *zap.Logger) error

// CreateStrategyStore initializes the StrategyStore and returns it.
CreateStrategyStore() (StrategyStore, error)
CreateStrategyStore() (StrategyStore, Aggregator, error)
}
13 changes: 13 additions & 0 deletions cmd/collector/app/sampling/strategystore/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package strategystore

import (
"context"
"io"

"github.com/jaegertracing/jaeger/thrift-gen/sampling"
)
Expand All @@ -25,3 +26,15 @@ type StrategyStore interface {
// GetSamplingStrategy retrieves the sampling strategy for the specified service.
GetSamplingStrategy(ctx context.Context, serviceName string) (*sampling.SamplingStrategyResponse, error)
}

// Aggregator defines an interface used to aggregate operation throughput.
type Aggregator interface {
// Close() from io.Closer stops the aggregator from aggregating throughput.
io.Closer

// RecordThroughput records throughput for an operation for aggregation.
RecordThroughput(service, operation, samplerType string, probability float64)

// Start starts aggregating operation throughput.
Start()
}
4 changes: 2 additions & 2 deletions cmd/collector/app/span_handler_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,14 @@ type SpanHandlers struct {
}

// BuildSpanProcessor builds the span processor to be used with the handlers
func (b *SpanHandlerBuilder) BuildSpanProcessor() processor.SpanProcessor {
func (b *SpanHandlerBuilder) BuildSpanProcessor(additional ...ProcessSpan) processor.SpanProcessor {
hostname, _ := os.Hostname()
svcMetrics := b.metricsFactory()
hostMetrics := svcMetrics.Namespace(metrics.NSOptions{Tags: map[string]string{"host": hostname}})

return NewSpanProcessor(
b.SpanWriter,
additional,
Options.ServiceMetrics(svcMetrics),
Options.HostMetrics(hostMetrics),
Options.Logger(b.logger()),
Expand All @@ -61,7 +62,6 @@ func (b *SpanHandlerBuilder) BuildSpanProcessor() processor.SpanProcessor {
Options.DynQueueSizeWarmup(uint(b.CollectorOpts.QueueSize)), // same as queue size for now
Options.DynQueueSizeMemory(b.CollectorOpts.DynQueueSizeMemory),
)

}

// BuildHandlers builds span handlers (Zipkin, Jaeger)
Expand Down
7 changes: 5 additions & 2 deletions cmd/collector/app/span_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,10 @@ type queueItem struct {
// NewSpanProcessor returns a SpanProcessor that preProcesses, filters, queues, sanitizes, and processes spans
func NewSpanProcessor(
spanWriter spanstore.Writer,
additional []ProcessSpan,
opts ...Option,
) processor.SpanProcessor {
sp := newSpanProcessor(spanWriter, opts...)
sp := newSpanProcessor(spanWriter, additional, opts...)

sp.queue.StartConsumers(sp.numWorkers, func(item interface{}) {
value := item.(*queueItem)
Expand All @@ -84,7 +85,7 @@ func NewSpanProcessor(
return sp
}

func newSpanProcessor(spanWriter spanstore.Writer, opts ...Option) *spanProcessor {
func newSpanProcessor(spanWriter spanstore.Writer, additional []ProcessSpan, opts ...Option) *spanProcessor {
options := Options.apply(opts...)
handlerMetrics := NewSpanProcessorMetrics(
options.serviceMetrics,
Expand Down Expand Up @@ -122,6 +123,8 @@ func newSpanProcessor(spanWriter spanstore.Writer, opts ...Option) *spanProcesso
processSpanFuncs = append(processSpanFuncs, sp.countSpan)
}

processSpanFuncs = append(processSpanFuncs, additional...)

sp.processSpan = ChainedProcessSpan(processSpanFuncs...)
return &sp
}
Expand Down
Loading