From d1923cc1e41ff2480a0c66e0e41798ab755f3264 Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Mon, 21 Feb 2022 12:45:17 +0800 Subject: [PATCH 1/3] model/modelindexer: speed up tests Some tests may intentionally cause an error, which will lead to Elasticsearch client backoff. Set a fast backoff to not slow the tests. --- model/modelindexer/indexer_test.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/model/modelindexer/indexer_test.go b/model/modelindexer/indexer_test.go index 3054bffbc3c..3535deeb1ea 100644 --- a/model/modelindexer/indexer_test.go +++ b/model/modelindexer/indexer_test.go @@ -266,7 +266,7 @@ func TestModelIndexerServerError(t *testing.T) { } func TestModelIndexerServerErrorTooManyRequests(t *testing.T) { - client := newMockElasticsearchClient(t, func(w http.ResponseWriter, _ *http.Request) { + client := newMockElasticsearchClient(t, func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusTooManyRequests) }) indexer, err := modelindexer.New(client, modelindexer.Config{FlushInterval: time.Minute}) @@ -579,6 +579,8 @@ func newMockElasticsearchClient(t testing.TB, bulkHandler http.HandlerFunc) elas config := elasticsearch.DefaultConfig() config.Hosts = elasticsearch.Hosts{srv.URL} + config.Backoff.Max = time.Nanosecond + client, err := elasticsearch.NewClient(config) require.NoError(t, err) return client From dc3ae44f4004aa46723fafc1c3ac3e1eed25631f Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Mon, 21 Feb 2022 13:58:39 +0800 Subject: [PATCH 2/3] model/modelindexer: fix flush-on-close If the background timer goroutine was started, but had not yet added to the errgroup, then Close could return before events enqueued by ProcessEvents were flushed. We fix this by ensuring the errgroup.Go call is made before ProcessEvents returned. If the timer is stopped (e.g. because Close is called), then we signal to the timer goroutine to flush immediately, instead of calling flushActive(Locked) from multiple code paths. --- changelogs/head.asciidoc | 1 + model/modelindexer/indexer.go | 67 ++++++++++++++++-------------- model/modelindexer/indexer_test.go | 4 +- 3 files changed, 39 insertions(+), 33 deletions(-) diff --git a/changelogs/head.asciidoc b/changelogs/head.asciidoc index b8f707d17b7..ce5b7c533b8 100644 --- a/changelogs/head.asciidoc +++ b/changelogs/head.asciidoc @@ -16,6 +16,7 @@ See <> for more information. - Fix infinite loop in tail-based sampling subscriber causing high CPU and repeated Elasticsearch searches {pull}7211[7211] - Do not overwrite `service version` if no transaction/error/... specific `service.name` is givven {pull}7281[7281] - Fix panic when processing OpenTelemetry histogram metrics without bounds {pull}7316[7316] +- Fix waiting for events to be flushed when shutting down APM Server {pull}7352[7352] [float] ==== Intake API Changes diff --git a/model/modelindexer/indexer.go b/model/modelindexer/indexer.go index 84eebecd39a..979135819a0 100644 --- a/model/modelindexer/indexer.go +++ b/model/modelindexer/indexer.go @@ -80,12 +80,13 @@ type Indexer struct { available chan *bulkIndexer g errgroup.Group - mu sync.RWMutex - closing bool - closed chan struct{} - activeMu sync.Mutex - active *bulkIndexer - timer *time.Timer + mu sync.RWMutex + closing bool + closed chan struct{} + activeMu sync.Mutex + active *bulkIndexer + timer *time.Timer + timerStopped chan struct{} } // Config holds configuration for Indexer. @@ -142,10 +143,11 @@ func New(client elasticsearch.Client, cfg Config) (*Indexer, error) { available <- newBulkIndexer(client, cfg.CompressionLevel) } return &Indexer{ - config: cfg, - logger: logger, - available: available, - closed: make(chan struct{}), + config: cfg, + logger: logger, + available: available, + closed: make(chan struct{}), + timerStopped: make(chan struct{}), }, nil } @@ -161,7 +163,7 @@ func (i *Indexer) Close(ctx context.Context) error { i.closing = true // Close i.closed when ctx is cancelled, - // unblock any ongoing flush attempts. + // unblocking any ongoing flush attempts. done := make(chan struct{}) defer close(done) go func() { @@ -173,10 +175,10 @@ func (i *Indexer) Close(ctx context.Context) error { }() i.activeMu.Lock() - defer i.activeMu.Unlock() if i.active != nil && i.timer.Stop() { - i.flushActiveLocked(ctx) + i.timerStopped <- struct{}{} } + i.activeMu.Unlock() } return i.g.Wait() } @@ -235,13 +237,20 @@ func (i *Indexer) processEvent(ctx context.Context, event *model.APMEvent) error case i.active = <-i.available: } if i.timer == nil { - i.timer = time.AfterFunc( - i.config.FlushInterval, - i.flushActive, - ) + i.timer = time.NewTimer(i.config.FlushInterval) } else { i.timer.Reset(i.config.FlushInterval) } + i.g.Go(func() error { + // The timer may be stopped by i.Close or when + // i.config.FlushBytes is exceeded, in which case + // i.timerStopped will be signalled. + select { + case <-i.timerStopped: + case <-i.timer.C: + } + return i.flushActive(context.Background()) + }) } if err := i.active.Add(elasticsearch.BulkIndexerItem{ @@ -256,7 +265,7 @@ func (i *Indexer) processEvent(ctx context.Context, event *model.APMEvent) error if i.active.Len() >= i.config.FlushBytes { if i.timer.Stop() { - i.flushActiveLocked(context.Background()) + i.timerStopped <- struct{}{} } } return nil @@ -309,13 +318,7 @@ func encodeMap(v map[string]interface{}, out *fastjson.Writer) error { return nil } -func (i *Indexer) flushActive() { - i.activeMu.Lock() - defer i.activeMu.Unlock() - i.flushActiveLocked(context.Background()) -} - -func (i *Indexer) flushActiveLocked(ctx context.Context) { +func (i *Indexer) flushActive(ctx context.Context) error { // Create a child context which is cancelled when the context passed to i.Close is cancelled. flushed := make(chan struct{}) ctx, cancel := context.WithCancel(ctx) @@ -326,15 +329,15 @@ func (i *Indexer) flushActiveLocked(ctx context.Context) { case <-flushed: } }() + + i.activeMu.Lock() + defer i.activeMu.Unlock() bulkIndexer := i.active i.active = nil - i.g.Go(func() error { - defer close(flushed) - err := i.flush(ctx, bulkIndexer) - bulkIndexer.Reset() - i.available <- bulkIndexer - return err - }) + err := i.flush(ctx, bulkIndexer) + bulkIndexer.Reset() + i.available <- bulkIndexer + return err } func (i *Indexer) flush(ctx context.Context, bulkIndexer *bulkIndexer) error { diff --git a/model/modelindexer/indexer_test.go b/model/modelindexer/indexer_test.go index 3535deeb1ea..703370b3a90 100644 --- a/model/modelindexer/indexer_test.go +++ b/model/modelindexer/indexer_test.go @@ -356,7 +356,9 @@ func TestModelIndexerCloseFlushContext(t *testing.T) { case <-r.Context().Done(): } }) - indexer, err := modelindexer.New(client, modelindexer.Config{}) + indexer, err := modelindexer.New(client, modelindexer.Config{ + FlushInterval: time.Millisecond, + }) require.NoError(t, err) defer indexer.Close(context.Background()) From fe91243ff9952b0817a2feb0cf917af1365c11fc Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Mon, 21 Feb 2022 20:23:24 +0800 Subject: [PATCH 3/3] model/modelindexer: fix goroutine leak --- model/modelindexer/indexer.go | 1 + model/modelindexer/indexer_test.go | 55 +++++++++++++++++++++++++++--- 2 files changed, 52 insertions(+), 4 deletions(-) diff --git a/model/modelindexer/indexer.go b/model/modelindexer/indexer.go index 979135819a0..a0b2aa6c4f8 100644 --- a/model/modelindexer/indexer.go +++ b/model/modelindexer/indexer.go @@ -321,6 +321,7 @@ func encodeMap(v map[string]interface{}, out *fastjson.Writer) error { func (i *Indexer) flushActive(ctx context.Context) error { // Create a child context which is cancelled when the context passed to i.Close is cancelled. flushed := make(chan struct{}) + defer close(flushed) ctx, cancel := context.WithCancel(ctx) go func() { defer cancel() diff --git a/model/modelindexer/indexer_test.go b/model/modelindexer/indexer_test.go index 703370b3a90..88b71007076 100644 --- a/model/modelindexer/indexer_test.go +++ b/model/modelindexer/indexer_test.go @@ -25,6 +25,7 @@ import ( "fmt" "net/http" "net/http/httptest" + "runtime" "strings" "sync/atomic" "testing" @@ -33,6 +34,7 @@ import ( "github.com/gofrs/uuid" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.elastic.co/apm" "go.elastic.co/apm/apmtest" "go.elastic.co/fastjson" @@ -44,6 +46,10 @@ import ( "github.com/elastic/apm-server/model/modelindexer" ) +func init() { + apm.DefaultTracer.Close() +} + func TestModelIndexer(t *testing.T) { var indexed int64 client := newMockElasticsearchClient(t, func(w http.ResponseWriter, r *http.Request) { @@ -393,6 +399,40 @@ func TestModelIndexerCloseFlushContext(t *testing.T) { } } +func TestModelIndexerFlushGoroutineStopped(t *testing.T) { + bulkHandler := func(w http.ResponseWriter, r *http.Request) {} + config := newMockElasticsearchClientConfig(t, bulkHandler) + httpTransport, _ := elasticsearch.NewHTTPTransport(config) + httpTransport.DisableKeepAlives = true // disable to avoid persistent conn goroutines + client, _ := elasticsearch.NewClientParams(elasticsearch.ClientParams{ + Config: config, + Transport: httpTransport, + }) + + indexer, err := modelindexer.New(client, modelindexer.Config{FlushBytes: 1}) + require.NoError(t, err) + defer indexer.Close(context.Background()) + + before := runtime.NumGoroutine() + for i := 0; i < 100; i++ { + batch := model.Batch{model.APMEvent{Timestamp: time.Now(), DataStream: model.DataStream{ + Type: "logs", + Dataset: "apm_server", + Namespace: "testing", + }}} + err := indexer.ProcessBatch(context.Background(), &batch) + require.NoError(t, err) + } + + var after int + deadline := time.Now().Add(10 * time.Second) + for after != before && time.Now().Before(deadline) { + time.Sleep(100 * time.Millisecond) + after = runtime.NumGoroutine() + } + assert.Equal(t, before, after, "Leaked %d goroutines", after-before) +} + func TestModelIndexerUnknownResponseFields(t *testing.T) { client := newMockElasticsearchClient(t, func(w http.ResponseWriter, r *http.Request) { w.Header().Set("X-Elastic-Product", "Elasticsearch") @@ -444,6 +484,7 @@ func testModelIndexerTracing(t *testing.T, statusCode int, expectedOutcome strin }) tracer := apmtest.NewRecordingTracer() + defer tracer.Close() indexer, err := modelindexer.New(client, modelindexer.Config{ FlushInterval: time.Minute, Tracer: tracer.Tracer, @@ -570,6 +611,15 @@ func benchmarkModelIndexer(b *testing.B, compressionLevel int) { } func newMockElasticsearchClient(t testing.TB, bulkHandler http.HandlerFunc) elasticsearch.Client { + config := newMockElasticsearchClientConfig(t, bulkHandler) + client, err := elasticsearch.NewClient(config) + require.NoError(t, err) + return client +} + +func newMockElasticsearchClientConfig( + t testing.TB, bulkHandler http.HandlerFunc, +) *elasticsearch.Config { mux := http.NewServeMux() mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { w.Header().Set("X-Elastic-Product", "Elasticsearch") @@ -582,8 +632,5 @@ func newMockElasticsearchClient(t testing.TB, bulkHandler http.HandlerFunc) elas config := elasticsearch.DefaultConfig() config.Hosts = elasticsearch.Hosts{srv.URL} config.Backoff.Max = time.Nanosecond - - client, err := elasticsearch.NewClient(config) - require.NoError(t, err) - return client + return config }