[receiver/elasticapmintake]Group events to avoid duplicate resource and scope spans#1214
Conversation
|
ℹ️ Recent review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Enterprise Run ID: 📒 Files selected for processing (9)
💤 Files with no reviewable changes (1)
✅ Files skipped from review due to trivial changes (1)
🚧 Files skipped from review as they are similar to previous changes (2)
📝 Walkthrough<review_stack_artifact> </review_stack_artifact> 🚥 Pre-merge checks | ✅ 2✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Warning There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure. 🔧 golangci-lint (2.12.2)level=error msg="[linters_context] typechecking error: pattern ./...: directory prefix . does not contain main module or its selected dependencies" Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
[For reviewers] I have intentionally left out metrics from grouping optimizations as it is not straightforward. |
This reverts commit c241d68.
There was a problem hiding this comment.
Pull request overview
This PR reduces per-event allocations in the Elastic APM intake receiver by grouping trace and log events that share the same resource attributes into a single ResourceSpans / ResourceLogs per processBatch call, using an xxhash-based resource fingerprint derived from a shared resource-attribute walker.
Changes:
- Added per-batch resource grouping (
signalGroups) and a stable resource fingerprint, reusingScopeSpans/ScopeLogsfor identical resources. - Introduced
mappers.WalkResourceAttributesas the single source of truth for both resource attribute writes and fingerprinting. - Added/updated benchmarks and updated golden testdata YAML outputs to reflect the new grouping behavior.
Reviewed changes
Copilot reviewed 19 out of 19 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| receiver/elasticapmintakereceiver/receiver.go | Uses resource fingerprint + per-batch caches to group trace/log events by resource; routes span/log conversion via cached scopes; switches resource attribute mapping to the walker visitor. |
| receiver/elasticapmintakereceiver/resource_grouping.go | Implements signalGroups caches and the xxhash(v2) resource fingerprint visitor. |
| receiver/elasticapmintakereceiver/resource_grouping_test.go | Adds a unit test ensuring numeric label float values don’t incorrectly merge resources. |
| receiver/elasticapmintakereceiver/internal/mappers/resource_walker.go | Adds WalkResourceAttributes walker + ResourceAttrVisitor to drive both hashing and pcommon writes from one field list. |
| receiver/elasticapmintakereceiver/internal/mappers/intakeV2ToSemConv.go | Removes the old resource-attribute translation function in favor of the new walker-based approach. |
| receiver/elasticapmintakereceiver/internal/mappers/intakeV2ToElasticSpecificFields.go | Removes elastic-specific resource attribute mapping + label mapping (now handled by the walker). |
| receiver/elasticapmintakereceiver/internal/mappers/intakeV2ToDerivedFields.go | Removes derived resource attributes for agent name/version (now handled by the walker). |
| receiver/elasticapmintakereceiver/receiver_bench_test.go | Adds direct-path HandleStream* benchmark suite and synthetic payload generators. |
| receiver/elasticapmintakereceiver/go.mod | Promotes github.com/cespare/xxhash/v2 to a direct dependency. |
| receiver/elasticapmintakereceiver/testdata/unknown-span-type_expected.yaml | Updates expected output to reflect grouped resource spans and reordered resource attributes. |
| receiver/elasticapmintakereceiver/testdata/transactions_spans_expected.yaml | Updates expected output to reflect grouped resource spans and reordered resource attributes. |
| receiver/elasticapmintakereceiver/testdata/transactions_expected.yaml | Updates expected output to reflect grouped resource spans and reordered resource attributes. |
| receiver/elasticapmintakereceiver/testdata/spans_representative_count_expected.yaml | Updates expected output to reflect resource grouping (removes duplicated resources). |
| receiver/elasticapmintakereceiver/testdata/spans_expected.yaml | Updates expected output to reflect grouped resource spans and reordered resource attributes. |
| receiver/elasticapmintakereceiver/testdata/span-links_expected.yaml | Updates expected output to reflect resource grouping (removes duplicated resources). |
| receiver/elasticapmintakereceiver/testdata/logs_expected.yaml | Updates expected output to reflect grouped resource logs and reordered resource attributes/log records. |
| receiver/elasticapmintakereceiver/testdata/invalid_ids_expected.yaml | Updates expected output to reflect resource grouping (removes duplicated resources). |
| receiver/elasticapmintakereceiver/testdata/hostdata_expected.yaml | Updates expected output to reflect resource grouping/reordering of resource attributes. |
| receiver/elasticapmintakereceiver/testdata/errors_expected.yaml | Updates expected output to reflect grouped resource logs and reordered resource attributes/log records. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // Including the key in the hash makes write order irrelevant for fields | ||
| // the visitor sees as Put*(key, value) — re-ordering the walker visits | ||
| // would produce the same fingerprint for the same set of attributes. |
| fmt.Sprintf("tx%014x", base+uint64(i)), | ||
| fmt.Sprintf("tx%014xtx%014x", base+uint64(i), base+uint64(i)), | ||
| 1_000_000+(base+uint64(i))*1_000, | ||
| ) | ||
| } | ||
| for i := range 8 { | ||
| fmt.Fprintf(&buf, | ||
| `{"span": {"id": %q, "trace_id": %q, "transaction_id": %q, "parent_id": %q, "name": "SELECT *", "type": "db.postgresql.query", "start": 1, "duration": 2, "timestamp": %d}}`+"\n", | ||
| fmt.Sprintf("sp%014x", base+uint64(i)), | ||
| fmt.Sprintf("tx%014xtx%014x", base+uint64(i), base+uint64(i)), | ||
| fmt.Sprintf("tx%014x", base+uint64(i)), | ||
| fmt.Sprintf("tx%014x", base+uint64(i)), | ||
| 1_000_000+(base+uint64(i))*1_000+1, | ||
| ) | ||
| } | ||
| fmt.Fprintf(&buf, | ||
| `{"error": {"id": %q, "trace_id": %q, "transaction_id": %q, "parent_id": %q, "timestamp": %d, "log": {"message": "boom"}}}`+"\n", | ||
| fmt.Sprintf("er%014x", base), | ||
| fmt.Sprintf("tx%014xtx%014x", base, base), | ||
| fmt.Sprintf("tx%014x", base), | ||
| fmt.Sprintf("tx%014x", base), |
carsonip
left a comment
There was a problem hiding this comment.
lgtm thanks, the approach is sound. A risk of hash collision as discussed during private sync but risk is low.
| if k == "" || nv == nil { | ||
| continue | ||
| } | ||
| v.PutDouble("numeric_labels."+k, nv.Value) |
There was a problem hiding this comment.
q: not related to this PR but is it true that numeric labels always only use .Value, not .Values? Asking because there is .Values handling in apm-data for numeric labels.
If this turns out to be a bug I'm happy to defer it in a different PR to keep this PR clean.
There was a problem hiding this comment.
Intake v2 will always produce only .Value, input/elasticapm/internal/modeldecoder always decodes into .Value. I think the .Values is used only for OTel via APM which this receiver doesn't need to deal with.
Summary
Group trace and log events that share a resource attribute set into a single
ResourceSpans/ResourceLogsperprocessBatchcall. The grouping key isan xxhash fingerprint of the event fields that affect the resource map.
Resource-attribute writes and the fingerprint hash are both implemented as
visitors over a single walker (
mappers.WalkResourceAttributes) — adding anew resource field is a one-place edit that both paths pick up
automatically. Metric events are not collapsed (would risk duplicate metric
names within a
ScopeMetrics); a follow-up will handle them.Motivation
Profiling the intake hot path showed every event allocating its own
ResourceSpans/ResourceLogsplus a fresh resource attribute map, evenwhen consecutive events came from the same agent metadata.
pcommon.Map.PutStrboxing on identical-resource fan-out dominatedper-event allocations.
The walker pattern was added to make the change safe to maintain: keeping a
fingerprint and a resource-attribute writer manually in sync is exactly the
kind of two-list-drift bug that produces silent data loss (events with
different values for an unhashed field get merged, the second event's value
is dropped via
Map.PutStr's update-on-existing semantics). One walkermakes the field set the single source of truth.
Benchmark
benchstatofBenchmarkProcessBatch+BenchmarkHandleStream*(thenew direct-path bench suite this PR adds) on
origin/mainvs this branch.10 runs each,
-benchtime=2s, Apple M4 Pro, Go 1.25.Allocations per op (geomean −12.20%)
Bytes per op (geomean −3.24%)
Time per op (geomean −0.92%)
Throughput (B/s) for HandleStream* benches (geomean +1.47%)
Notes:
the collapse optimises). On 1000 transactions: −24.7% allocs, −16.9%
B/op, −8.5% ns/op, +9.3% throughput.
metricsets,histograms,metric_global_label_shadow) sit within ±2% on allocs as expected — themetric path is intentionally not collapsed.
HandleStream/transactions+3.65%) come from the walker's visitorinterface dispatch and the labels-sort pass; they're real but small, and
the alloc reduction more than compensates on workloads where multiple
events actually share a resource (i.e. anything beyond a 5-event
fixture).