diff --git a/changelogs/head.asciidoc b/changelogs/head.asciidoc index 5389268ea21..0bf73699003 100644 --- a/changelogs/head.asciidoc +++ b/changelogs/head.asciidoc @@ -17,6 +17,7 @@ See <> for more information. - Do not overwrite `service version` if no transaction/error/... specific `service.name` is givven {pull}7281[7281] - Fix panic when processing OpenTelemetry histogram metrics without bounds {pull}7316[7316] - Fix mixing of labels across OpenTelemetry log records {pull}7358[7358] +- Fix waiting for events to be flushed when shutting down APM Server {pull}7352[7352] [float] ==== Intake API Changes diff --git a/model/modelindexer/indexer.go b/model/modelindexer/indexer.go index 84eebecd39a..a0b2aa6c4f8 100644 --- a/model/modelindexer/indexer.go +++ b/model/modelindexer/indexer.go @@ -80,12 +80,13 @@ type Indexer struct { available chan *bulkIndexer g errgroup.Group - mu sync.RWMutex - closing bool - closed chan struct{} - activeMu sync.Mutex - active *bulkIndexer - timer *time.Timer + mu sync.RWMutex + closing bool + closed chan struct{} + activeMu sync.Mutex + active *bulkIndexer + timer *time.Timer + timerStopped chan struct{} } // Config holds configuration for Indexer. @@ -142,10 +143,11 @@ func New(client elasticsearch.Client, cfg Config) (*Indexer, error) { available <- newBulkIndexer(client, cfg.CompressionLevel) } return &Indexer{ - config: cfg, - logger: logger, - available: available, - closed: make(chan struct{}), + config: cfg, + logger: logger, + available: available, + closed: make(chan struct{}), + timerStopped: make(chan struct{}), }, nil } @@ -161,7 +163,7 @@ func (i *Indexer) Close(ctx context.Context) error { i.closing = true // Close i.closed when ctx is cancelled, - // unblock any ongoing flush attempts. + // unblocking any ongoing flush attempts. done := make(chan struct{}) defer close(done) go func() { @@ -173,10 +175,10 @@ func (i *Indexer) Close(ctx context.Context) error { }() i.activeMu.Lock() - defer i.activeMu.Unlock() if i.active != nil && i.timer.Stop() { - i.flushActiveLocked(ctx) + i.timerStopped <- struct{}{} } + i.activeMu.Unlock() } return i.g.Wait() } @@ -235,13 +237,20 @@ func (i *Indexer) processEvent(ctx context.Context, event *model.APMEvent) error case i.active = <-i.available: } if i.timer == nil { - i.timer = time.AfterFunc( - i.config.FlushInterval, - i.flushActive, - ) + i.timer = time.NewTimer(i.config.FlushInterval) } else { i.timer.Reset(i.config.FlushInterval) } + i.g.Go(func() error { + // The timer may be stopped by i.Close or when + // i.config.FlushBytes is exceeded, in which case + // i.timerStopped will be signalled. + select { + case <-i.timerStopped: + case <-i.timer.C: + } + return i.flushActive(context.Background()) + }) } if err := i.active.Add(elasticsearch.BulkIndexerItem{ @@ -256,7 +265,7 @@ func (i *Indexer) processEvent(ctx context.Context, event *model.APMEvent) error if i.active.Len() >= i.config.FlushBytes { if i.timer.Stop() { - i.flushActiveLocked(context.Background()) + i.timerStopped <- struct{}{} } } return nil @@ -309,15 +318,10 @@ func encodeMap(v map[string]interface{}, out *fastjson.Writer) error { return nil } -func (i *Indexer) flushActive() { - i.activeMu.Lock() - defer i.activeMu.Unlock() - i.flushActiveLocked(context.Background()) -} - -func (i *Indexer) flushActiveLocked(ctx context.Context) { +func (i *Indexer) flushActive(ctx context.Context) error { // Create a child context which is cancelled when the context passed to i.Close is cancelled. flushed := make(chan struct{}) + defer close(flushed) ctx, cancel := context.WithCancel(ctx) go func() { defer cancel() @@ -326,15 +330,15 @@ func (i *Indexer) flushActiveLocked(ctx context.Context) { case <-flushed: } }() + + i.activeMu.Lock() + defer i.activeMu.Unlock() bulkIndexer := i.active i.active = nil - i.g.Go(func() error { - defer close(flushed) - err := i.flush(ctx, bulkIndexer) - bulkIndexer.Reset() - i.available <- bulkIndexer - return err - }) + err := i.flush(ctx, bulkIndexer) + bulkIndexer.Reset() + i.available <- bulkIndexer + return err } func (i *Indexer) flush(ctx context.Context, bulkIndexer *bulkIndexer) error { diff --git a/model/modelindexer/indexer_test.go b/model/modelindexer/indexer_test.go index 3054bffbc3c..88b71007076 100644 --- a/model/modelindexer/indexer_test.go +++ b/model/modelindexer/indexer_test.go @@ -25,6 +25,7 @@ import ( "fmt" "net/http" "net/http/httptest" + "runtime" "strings" "sync/atomic" "testing" @@ -33,6 +34,7 @@ import ( "github.com/gofrs/uuid" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.elastic.co/apm" "go.elastic.co/apm/apmtest" "go.elastic.co/fastjson" @@ -44,6 +46,10 @@ import ( "github.com/elastic/apm-server/model/modelindexer" ) +func init() { + apm.DefaultTracer.Close() +} + func TestModelIndexer(t *testing.T) { var indexed int64 client := newMockElasticsearchClient(t, func(w http.ResponseWriter, r *http.Request) { @@ -266,7 +272,7 @@ func TestModelIndexerServerError(t *testing.T) { } func TestModelIndexerServerErrorTooManyRequests(t *testing.T) { - client := newMockElasticsearchClient(t, func(w http.ResponseWriter, _ *http.Request) { + client := newMockElasticsearchClient(t, func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusTooManyRequests) }) indexer, err := modelindexer.New(client, modelindexer.Config{FlushInterval: time.Minute}) @@ -356,7 +362,9 @@ func TestModelIndexerCloseFlushContext(t *testing.T) { case <-r.Context().Done(): } }) - indexer, err := modelindexer.New(client, modelindexer.Config{}) + indexer, err := modelindexer.New(client, modelindexer.Config{ + FlushInterval: time.Millisecond, + }) require.NoError(t, err) defer indexer.Close(context.Background()) @@ -391,6 +399,40 @@ func TestModelIndexerCloseFlushContext(t *testing.T) { } } +func TestModelIndexerFlushGoroutineStopped(t *testing.T) { + bulkHandler := func(w http.ResponseWriter, r *http.Request) {} + config := newMockElasticsearchClientConfig(t, bulkHandler) + httpTransport, _ := elasticsearch.NewHTTPTransport(config) + httpTransport.DisableKeepAlives = true // disable to avoid persistent conn goroutines + client, _ := elasticsearch.NewClientParams(elasticsearch.ClientParams{ + Config: config, + Transport: httpTransport, + }) + + indexer, err := modelindexer.New(client, modelindexer.Config{FlushBytes: 1}) + require.NoError(t, err) + defer indexer.Close(context.Background()) + + before := runtime.NumGoroutine() + for i := 0; i < 100; i++ { + batch := model.Batch{model.APMEvent{Timestamp: time.Now(), DataStream: model.DataStream{ + Type: "logs", + Dataset: "apm_server", + Namespace: "testing", + }}} + err := indexer.ProcessBatch(context.Background(), &batch) + require.NoError(t, err) + } + + var after int + deadline := time.Now().Add(10 * time.Second) + for after != before && time.Now().Before(deadline) { + time.Sleep(100 * time.Millisecond) + after = runtime.NumGoroutine() + } + assert.Equal(t, before, after, "Leaked %d goroutines", after-before) +} + func TestModelIndexerUnknownResponseFields(t *testing.T) { client := newMockElasticsearchClient(t, func(w http.ResponseWriter, r *http.Request) { w.Header().Set("X-Elastic-Product", "Elasticsearch") @@ -442,6 +484,7 @@ func testModelIndexerTracing(t *testing.T, statusCode int, expectedOutcome strin }) tracer := apmtest.NewRecordingTracer() + defer tracer.Close() indexer, err := modelindexer.New(client, modelindexer.Config{ FlushInterval: time.Minute, Tracer: tracer.Tracer, @@ -568,6 +611,15 @@ func benchmarkModelIndexer(b *testing.B, compressionLevel int) { } func newMockElasticsearchClient(t testing.TB, bulkHandler http.HandlerFunc) elasticsearch.Client { + config := newMockElasticsearchClientConfig(t, bulkHandler) + client, err := elasticsearch.NewClient(config) + require.NoError(t, err) + return client +} + +func newMockElasticsearchClientConfig( + t testing.TB, bulkHandler http.HandlerFunc, +) *elasticsearch.Config { mux := http.NewServeMux() mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { w.Header().Set("X-Elastic-Product", "Elasticsearch") @@ -579,7 +631,6 @@ func newMockElasticsearchClient(t testing.TB, bulkHandler http.HandlerFunc) elas config := elasticsearch.DefaultConfig() config.Hosts = elasticsearch.Hosts{srv.URL} - client, err := elasticsearch.NewClient(config) - require.NoError(t, err) - return client + config.Backoff.Max = time.Nanosecond + return config }