Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelogs/head.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ See <<apm-server-configuration>> for more information.
- Fix infinite loop in tail-based sampling subscriber causing high CPU and repeated Elasticsearch searches {pull}7211[7211]
- Do not overwrite `service version` if no transaction/error/... specific `service.name` is givven {pull}7281[7281]
- Fix panic when processing OpenTelemetry histogram metrics without bounds {pull}7316[7316]
- Fix waiting for events to be flushed when shutting down APM Server {pull}7352[7352]

[float]
==== Intake API Changes
Expand Down
67 changes: 35 additions & 32 deletions model/modelindexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,13 @@ type Indexer struct {
available chan *bulkIndexer
g errgroup.Group

mu sync.RWMutex
closing bool
closed chan struct{}
activeMu sync.Mutex
active *bulkIndexer
timer *time.Timer
mu sync.RWMutex
closing bool
closed chan struct{}
activeMu sync.Mutex
active *bulkIndexer
timer *time.Timer
timerStopped chan struct{}
}

// Config holds configuration for Indexer.
Expand Down Expand Up @@ -142,10 +143,11 @@ func New(client elasticsearch.Client, cfg Config) (*Indexer, error) {
available <- newBulkIndexer(client, cfg.CompressionLevel)
}
return &Indexer{
config: cfg,
logger: logger,
available: available,
closed: make(chan struct{}),
config: cfg,
logger: logger,
available: available,
closed: make(chan struct{}),
timerStopped: make(chan struct{}),
}, nil
}

Expand All @@ -161,7 +163,7 @@ func (i *Indexer) Close(ctx context.Context) error {
i.closing = true

// Close i.closed when ctx is cancelled,
// unblock any ongoing flush attempts.
// unblocking any ongoing flush attempts.
done := make(chan struct{})
defer close(done)
go func() {
Expand All @@ -173,10 +175,10 @@ func (i *Indexer) Close(ctx context.Context) error {
}()

i.activeMu.Lock()
defer i.activeMu.Unlock()
if i.active != nil && i.timer.Stop() {
i.flushActiveLocked(ctx)
i.timerStopped <- struct{}{}
}
i.activeMu.Unlock()
}
return i.g.Wait()
}
Expand Down Expand Up @@ -235,13 +237,20 @@ func (i *Indexer) processEvent(ctx context.Context, event *model.APMEvent) error
case i.active = <-i.available:
}
if i.timer == nil {
i.timer = time.AfterFunc(
i.config.FlushInterval,
i.flushActive,
)
i.timer = time.NewTimer(i.config.FlushInterval)
} else {
i.timer.Reset(i.config.FlushInterval)
}
i.g.Go(func() error {
// The timer may be stopped by i.Close or when
// i.config.FlushBytes is exceeded, in which case
// i.timerStopped will be signalled.
select {
case <-i.timerStopped:
case <-i.timer.C:
}
return i.flushActive(context.Background())
})
}

if err := i.active.Add(elasticsearch.BulkIndexerItem{
Expand All @@ -256,7 +265,7 @@ func (i *Indexer) processEvent(ctx context.Context, event *model.APMEvent) error

if i.active.Len() >= i.config.FlushBytes {
if i.timer.Stop() {
i.flushActiveLocked(context.Background())
i.timerStopped <- struct{}{}
}
}
return nil
Expand Down Expand Up @@ -309,13 +318,7 @@ func encodeMap(v map[string]interface{}, out *fastjson.Writer) error {
return nil
}

func (i *Indexer) flushActive() {
i.activeMu.Lock()
defer i.activeMu.Unlock()
i.flushActiveLocked(context.Background())
}

func (i *Indexer) flushActiveLocked(ctx context.Context) {
func (i *Indexer) flushActive(ctx context.Context) error {
// Create a child context which is cancelled when the context passed to i.Close is cancelled.
flushed := make(chan struct{})
Comment thread
axw marked this conversation as resolved.
ctx, cancel := context.WithCancel(ctx)
Expand All @@ -326,15 +329,15 @@ func (i *Indexer) flushActiveLocked(ctx context.Context) {
case <-flushed:
}
}()

i.activeMu.Lock()
defer i.activeMu.Unlock()
bulkIndexer := i.active
i.active = nil
i.g.Go(func() error {
defer close(flushed)
err := i.flush(ctx, bulkIndexer)
bulkIndexer.Reset()
i.available <- bulkIndexer
return err
})
err := i.flush(ctx, bulkIndexer)
bulkIndexer.Reset()
i.available <- bulkIndexer
return err
}

func (i *Indexer) flush(ctx context.Context, bulkIndexer *bulkIndexer) error {
Expand Down
8 changes: 6 additions & 2 deletions model/modelindexer/indexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ func TestModelIndexerServerError(t *testing.T) {
}

func TestModelIndexerServerErrorTooManyRequests(t *testing.T) {
client := newMockElasticsearchClient(t, func(w http.ResponseWriter, _ *http.Request) {
client := newMockElasticsearchClient(t, func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusTooManyRequests)
})
indexer, err := modelindexer.New(client, modelindexer.Config{FlushInterval: time.Minute})
Expand Down Expand Up @@ -356,7 +356,9 @@ func TestModelIndexerCloseFlushContext(t *testing.T) {
case <-r.Context().Done():
}
})
indexer, err := modelindexer.New(client, modelindexer.Config{})
indexer, err := modelindexer.New(client, modelindexer.Config{
FlushInterval: time.Millisecond,
})
require.NoError(t, err)
defer indexer.Close(context.Background())

Expand Down Expand Up @@ -579,6 +581,8 @@ func newMockElasticsearchClient(t testing.TB, bulkHandler http.HandlerFunc) elas

config := elasticsearch.DefaultConfig()
config.Hosts = elasticsearch.Hosts{srv.URL}
config.Backoff.Max = time.Nanosecond

client, err := elasticsearch.NewClient(config)
require.NoError(t, err)
return client
Expand Down