Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[chore][exporter/elasticsearch] Add downsampling for profiling events #37893

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/elasticsearch-profiles-downsampling.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: elasticsearchexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add stratified downsampling to the profiles support in the elasticsearch exporter
rockdaboot marked this conversation as resolved.
Show resolved Hide resolved

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [37893]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
Original file line number Diff line number Diff line change
Expand Up @@ -22,52 +22,45 @@ const (

// SerializeProfile serializes a profile and calls the `pushData` callback for each generated document.
func SerializeProfile(resource pcommon.Resource, scope pcommon.InstrumentationScope, profile pprofile.Profile, pushData func(*bytes.Buffer, string, string) error) error {
pushDataAsJSON := func(data any, id, index string) error {
c, err := toJSON(data)
if err != nil {
return err
}
return pushData(c, id, index)
}

data, err := serializeprofiles.Transform(resource, scope, profile)
if err != nil {
return err
}

for _, payload := range data {
if payload.StackTraceEvent.StackTraceID != "" {
c, err := toJSON(payload.StackTraceEvent)
if err != nil {
event := payload.StackTraceEvent

if event.StackTraceID != "" {
if err = pushDataAsJSON(event, "", AllEventsIndex); err != nil {
return err
}
err = pushData(c, "", AllEventsIndex)
if err != nil {
if err = serializeprofiles.IndexDownsampledEvent(event, pushDataAsJSON); err != nil {
return err
}
}

if payload.StackTrace.DocID != "" {
c, err := toJSON(payload.StackTrace)
if err != nil {
return err
}
err = pushData(c, payload.StackTrace.DocID, StackTraceIndex)
if err != nil {
if err = pushDataAsJSON(payload.StackTrace, payload.StackTrace.DocID, StackTraceIndex); err != nil {
return err
}
}

for _, stackFrame := range payload.StackFrames {
c, err := toJSON(stackFrame)
if err != nil {
return err
}
err = pushData(c, stackFrame.DocID, StackFrameIndex)
if err != nil {
if err = pushDataAsJSON(stackFrame, stackFrame.DocID, StackFrameIndex); err != nil {
return err
}
}

for _, executable := range payload.Executables {
c, err := toJSON(executable)
if err != nil {
return err
}
err = pushData(c, executable.DocID, ExecutablesIndex)
if err != nil {
if err = pushDataAsJSON(executable, executable.DocID, ExecutablesIndex); err != nil {
return err
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package serializeprofiles // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/serializer/otelserializer/serializeprofiles"

import (
"fmt"
"math/rand/v2"
)

// ## Why do we need downsampling ?
// For every (virtual) CPU core, the host agent (HA) retrieves 20 stacktrace events which are
// stored as a timeseries in an ES index (name 'profiling-events'). With an increasing number
// of hosts and/or an increasing number of cores the number of stored events per second
// become high quickly. E.g. data from 10000 cores generate 846 million events per day.
// Since users want to drill down into e.g. single hosts and/or single applications, we can't
// reduce the amount of data in advance. Querying such amounts data is costly, even when using
// highly specialised database backends - costly in terms of I/O, CPU. And this results in
// increased latency - the user has to wait eventually a long time for his query results.
// In order to reduce the costs and to keep the latency as low as possible, we add 'helper'
// indexes with downsampled subsets of the stacktrace events.
//
// ## How does our downsampling approach work ?
// The idea is to create downsampled indexes with factors of 5^N (5, 25, 125, 625, ...).
// In the 5^1 index we would store 1/5th of the original events, in the 5^2 index we store
// 1/25th of the original events and so on.
// So each event has a probability of p=1/5=0.2 to also be stored in the next downsampled index.
// Since we aggregate identical stacktrace events by timestamp when reported and stored, we have
// a 'Count' value for each. To be statistically correct, we have to apply p=0.2 to each single
// event independently and not just to the aggregate. We can do so by looping over 'Count' and
// apply p=0.2 on every iteration to generate a new 'Count' value for the next downsampled index.
// We only store aggregates with 'Count' > 0.
//
// At some point we decided that 20k events per query is good enough. With 5^N it means that we
// possibly can end up with 5x more events (100k) from an index. As of this writing, retrieving
// and processing of 100k events is still fast enough. While in Clickhouse we could further
// downsample on-the-fly to get 20k, ES currently doesn't allow this (may change in the future).
//
// At query time we have to find the index that has enough data to be statistically sound,
// without having too much data to avoid costs and latency. The code for that is implemented on
// the read path (Kibana profiler plugin) and described there in detail.
//
// ## Example of a query / calculation
// Let's imagine, a given query spans a time range of 7 days and would result in 100 million
// events without down-sampling. But we only really need 20k events for a good enough result.
// In the 5^1 downsampled index we have 5x less data - this still results in 20 millions events.
// Going deeper we end up in the 5^5 downsampled index with 32k results - 5^4 would give us 160k
// (too many) and 5^6 would give us 6.4k events (not enough).
// We now read and process all 32k events from the 5^5 index. The counts for any aggregation
// (TopN, Flamegraph, ...) needs to be multiplied by 5^5, which is an estimate of what we would
// have found in the full events index (the not downsampled index).
//
// ## How deep do we have to downsample ?
// The current code assumes an arbitrary upper limit of 100k CPU cores and a query time range
// of 7 days. (Please be aware that we get 20 events per core per second only if the core is
// 100% busy.)
//
// The condition is
//
// (100k * 86400 * 7 * 20) / 5^N in [20k, 100k-1]
// ^-- max number of events per second
// ^------ number of days
// ^-------------- seconds per day
// ^--------------------- number of cores
//
// For N=11 the condition is satisfied with a value of 24772.
// In numbers, the 5^11 downsampled index holds 48828125x fewer entries than the full events table.
//
// ## What is the cost of downsampling ?
// The additional cost in terms of storage size is
//
// 1/5^1 +1/5^2 + ... + 1/5^11 = 25%
//
// The same goes for the additional CPU cost on the write path.
//
// The average benefit on the read/query path depends on the query. But it seems that in average
// a factor of few hundred to a few thousand in terms of I/O, CPU and latency can be achieved.
const (
MaxEventsIndexes = 11
SamplingFactor = 5
SamplingRatio = 1.0 / float64(SamplingFactor)
rockdaboot marked this conversation as resolved.
Show resolved Hide resolved

eventsIndexPrefix = "profiling-events"
)

var eventIndices = initEventIndexes(MaxEventsIndexes)

// A fixed seed is used for deterministic tests and development.
// There is no downside in using a fixed seed in production.
var rnd = rand.New(rand.NewPCG(0, 0))

// initEventIndexes initializes eventIndexes to avoid calculations for every TraceEvent later.
func initEventIndexes(count int) []string {
indices := make([]string, 0, count)

for i := range count {
indices = append(indices, fmt.Sprintf("%s-%dpow%02d",
eventsIndexPrefix, SamplingFactor, i+1))
}

return indices
}

func IndexDownsampledEvent(event StackTraceEvent, pushData func(any, string, string) error) error {
origCount := event.Count
defer func() { event.Count = origCount }()
rockdaboot marked this conversation as resolved.
Show resolved Hide resolved

// Each event has a probability of p=1/5=0.2 to go from one index into the next downsampled
// index. Since we aggregate identical stacktrace events by timestamp when reported and stored,
// we have a 'Count' value for each. To be statistically correct, we have to apply p=0.2 to
// each single stacktrace event independently and not just to the aggregate. We can do so by
// looping over 'Count' and apply p=0.2 on every iteration to generate a new 'Count' value for
// the next downsampled index.
// We only store aggregates with 'Count' > 0. If 'Count' becomes 0, we are done and can
// continue with the next stacktrace event.
for _, index := range eventIndices {
var count uint16
for range event.Count {
// samplingRatio is the probability p=0.2 for an event to be copied into the next
// downsampled index.
if rnd.Float64() < SamplingRatio {
count++
}
}
if count == 0 {
return nil
}

// Store the event with its new downsampled count in the downsampled index.
event.Count = count

if err := pushData(event, "", index); err != nil {
return err
}
}

return nil
rockdaboot marked this conversation as resolved.
Show resolved Hide resolved
}