feat(loadbalancingexporter): compressed queue payloads (SAW-6556 backport)#16
Conversation
|
You have reached your Codex usage limits for code reviews. You can see your limits in the Codex usage dashboard. |
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
WalkthroughAdds optional payload compression to the loadbalancing exporter: introduces exported QueueSettings with PayloadCompression and CompressInMemory, implements a queue payload codec (none/snappy/zstd), wires the codec into factory/resilience/shutdown flows, adds validation and tests, and updates docs and dependencies. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant Queue
participant Codec
participant Transport
Client->>Queue: Enqueue(payload)
Queue->>Codec: Encode(payload, cfg.PayloadCompression, cfg.CompressInMemory)
alt PayloadCompression == snappy
Codec->>Codec: Compress with Snappy
else PayloadCompression == zstd
Codec->>Codec: Compress with Zstd
else PayloadCompression == none
Codec->>Codec: No compression
end
Codec->>Queue: Return encoded bytes (header + body)
Queue->>Transport: Send encoded payload
sequenceDiagram
participant Transport
participant Queue
participant Codec
participant Receiver
Transport->>Queue: Deliver encoded payload
Queue->>Codec: Decode(encoded bytes)
Codec->>Codec: Parse header (magic, version, codec ID)
alt codec ID == snappy
Codec->>Codec: Decompress with Snappy
else codec ID == zstd
Codec->>Codec: Decompress with Zstd
else codec ID == none
Codec->>Codec: No decompression
end
Codec->>Receiver: Return original payload
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Warning There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure. 🔧 golangci-lint (2.5.0)level=error msg="Running error: context loading failed: no go files to analyze: running Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
[ARCH-REVIEW] REQUEST_CHANGES — 1 concern, 2 nits
Clean backport of sending_queue.payload_compression to the loadbalancing exporter. Config validation, codec framing (magic+version+codecID), and roundtrip tests are all solid. One lifecycle concern: the zstd.Decoder spawns internal goroutines on init and must be explicitly closed; the current design gives no shutdown path to call Close() on the codec. Two nits: an unusual x/net pseudo-version pin and a missing test for the version-mismatch Decode branch.
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (4)
exporter/loadbalancingexporter/config.go (1)
81-83: Consider extendingConfig.Validate()to include other validation rules.Currently,
Config.Validate()only delegates toQueueSettings.Validate(). If there are other configuration fields that should be validated (e.g.,ResolverSettings,RoutingKeyvsRoutingAttributesmutual exclusivity), consider adding those checks here or documenting why they're validated elsewhere.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@exporter/loadbalancingexporter/config.go` around lines 81 - 83, The current Config.Validate method only calls QueueSettings.Validate; extend it to also validate other fields such as ResolverSettings (e.g., ensure required resolver fields are present/consistent) and enforce mutual exclusivity between RoutingKey and RoutingAttributes (or validate their required formats) before returning; update the Config.Validate function to call QueueSettings.Validate(), then perform these additional checks (reference Config.Validate, QueueSettings.Validate, ResolverSettings, RoutingKey, RoutingAttributes) and return a descriptive error when any new validation fails.exporter/loadbalancingexporter/factory_test.go (1)
234-242: Test case may have unexpected interaction with compression.The test "Should have all resilience options if defined" sets queue config and backoff but doesn't set
PayloadCompression. When combined with the previous test case logic, this correctly expects 3 options (timeout, queue, retry). However, if someone adds compression here, the count would change. Consider adding a comment clarifying the expected options for maintainability.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@exporter/loadbalancingexporter/factory_test.go` around lines 234 - 242, The test "Should have all resilience options if defined" relies on buildExporterResilienceOptions(cfg) returning exactly 3 options but doesn't set cfg.PayloadCompression, which can change the expected count if compression is later introduced; update the test to explicitly set cfg.PayloadCompression (or otherwise control compression) and add a short comment explaining the expectation so future changes won't break the count — locate the test function and the Config used (createDefaultConfig, Config, PayloadCompression) and ensure PayloadCompression is set to a known value (e.g., disabled) before asserting Len(...)=3 and add a one-line comment describing why 3 options are expected.exporter/loadbalancingexporter/config_test.go (1)
29-41: Consider using subtests and testing the empty string case explicitly.The test covers the main cases well. For better test isolation and clarity, consider using
t.Run()subtests. Also, explicitly testing the empty string case would verify the default behavior documented in the validation logic.♻️ Suggested improvement
func TestConfigValidatePayloadCompression(t *testing.T) { - cfg := createDefaultConfig().(*Config) - require.NoError(t, cfg.Validate()) - - cfg.QueueSettings.PayloadCompression = QueuePayloadCompressionSnappy - require.NoError(t, cfg.Validate()) - - cfg.QueueSettings.PayloadCompression = QueuePayloadCompressionZstd - require.NoError(t, cfg.Validate()) - - cfg.QueueSettings.PayloadCompression = QueuePayloadCompression("invalid") - require.Error(t, cfg.Validate()) + tests := []struct { + name string + compression QueuePayloadCompression + wantErr bool + }{ + {name: "default (none)", compression: QueuePayloadCompressionNone, wantErr: false}, + {name: "empty string", compression: "", wantErr: false}, + {name: "snappy", compression: QueuePayloadCompressionSnappy, wantErr: false}, + {name: "zstd", compression: QueuePayloadCompressionZstd, wantErr: false}, + {name: "invalid", compression: QueuePayloadCompression("invalid"), wantErr: true}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cfg := createDefaultConfig().(*Config) + cfg.QueueSettings.PayloadCompression = tt.compression + if tt.wantErr { + require.Error(t, cfg.Validate()) + } else { + require.NoError(t, cfg.Validate()) + } + }) + } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@exporter/loadbalancingexporter/config_test.go` around lines 29 - 41, Refactor TestConfigValidatePayloadCompression to use t.Run subtests for each case (e.g., "default/empty", "snappy", "zstd", "invalid") and in each subtest create a fresh cfg via createDefaultConfig().(*Config) before mutating QueueSettings.PayloadCompression; explicitly include a subtest that sets PayloadCompression = "" to validate the documented default behavior, assert NoError for valid values ("", QueuePayloadCompressionSnappy, QueuePayloadCompressionZstd) and Error for QueuePayloadCompression("invalid"), and keep assertions local to each subtest for isolation.exporter/loadbalancingexporter/payload_codec_test.go (1)
36-48: Consider adding tests for edge cases.The current tests cover basic invalid payload scenarios. Consider adding tests for:
- Unsupported version number (to verify the version check error path)
- Unsupported codec ID in a valid header
- Empty payload after header
- Large payloads to verify compression actually reduces size
♻️ Additional test cases
func TestQueuePayloadCodecDecodeUnsupportedVersion(t *testing.T) { codec := newQueuePayloadCodec(QueuePayloadCompressionSnappy) encoded, err := codec.Encode([]byte("hello")) require.NoError(t, err) // Corrupt version byte encoded[3] = 99 _, err = codec.Decode(encoded) require.Error(t, err) require.Contains(t, err.Error(), "unsupported version") } func TestQueuePayloadCodecDecodeUnsupportedCodec(t *testing.T) { codec := newQueuePayloadCodec(QueuePayloadCompressionSnappy) encoded, err := codec.Encode([]byte("hello")) require.NoError(t, err) // Corrupt codec ID byte encoded[4] = 99 _, err = codec.Decode(encoded) require.Error(t, err) require.Contains(t, err.Error(), "unsupported codec") }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@exporter/loadbalancingexporter/payload_codec_test.go` around lines 36 - 48, Extend the TestQueuePayloadCodecDecodeRejectsInvalidPayload test suite by adding new tests that exercise the version-check and codec-ID error paths and edge payload sizes: use newQueuePayloadCodec(QueuePayloadCompressionSnappy) and codec.Encode to produce a valid encoded slice, then corrupt the version byte (e.g., encoded[3]=99) and assert codec.Decode returns an error containing "unsupported version"; corrupt the codec ID byte (e.g., encoded[4]=99) and assert the error contains "unsupported codec"; add a test that truncates the encoded slice to remove the payload after the header and assert Decode returns an error for empty payload; and add a large payload test (e.g., large repeated byte slice), assert Encode succeeds and that compressed length is smaller than original and that Decode returns the original data.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@exporter/loadbalancingexporter/payload_codec.go`:
- Around line 98-107: The initZstd function in queuePayloadCodec must handle the
case where zstd.NewWriter succeeds but zstd.NewReader fails by closing the
created encoder to avoid leaking resources and setting c.zstdErr to the reader
error; modify initZstd so after creating c.zstdEnc it attempts to create
c.zstdDec and if that second call returns an error it calls c.zstdEnc.Close()
before assigning c.zstdErr and returning. Also add a Close() method on
queuePayloadCodec that safely closes c.zstdEnc and c.zstdDec (checking for nil)
to release resources when the codec is no longer needed; reference the existing
initZstd, c.zstdEnc, c.zstdDec, c.zstdErr and implement Close() to be idempotent
and safe for use after initZstd.
---
Nitpick comments:
In `@exporter/loadbalancingexporter/config_test.go`:
- Around line 29-41: Refactor TestConfigValidatePayloadCompression to use t.Run
subtests for each case (e.g., "default/empty", "snappy", "zstd", "invalid") and
in each subtest create a fresh cfg via createDefaultConfig().(*Config) before
mutating QueueSettings.PayloadCompression; explicitly include a subtest that
sets PayloadCompression = "" to validate the documented default behavior, assert
NoError for valid values ("", QueuePayloadCompressionSnappy,
QueuePayloadCompressionZstd) and Error for QueuePayloadCompression("invalid"),
and keep assertions local to each subtest for isolation.
In `@exporter/loadbalancingexporter/config.go`:
- Around line 81-83: The current Config.Validate method only calls
QueueSettings.Validate; extend it to also validate other fields such as
ResolverSettings (e.g., ensure required resolver fields are present/consistent)
and enforce mutual exclusivity between RoutingKey and RoutingAttributes (or
validate their required formats) before returning; update the Config.Validate
function to call QueueSettings.Validate(), then perform these additional checks
(reference Config.Validate, QueueSettings.Validate, ResolverSettings,
RoutingKey, RoutingAttributes) and return a descriptive error when any new
validation fails.
In `@exporter/loadbalancingexporter/factory_test.go`:
- Around line 234-242: The test "Should have all resilience options if defined"
relies on buildExporterResilienceOptions(cfg) returning exactly 3 options but
doesn't set cfg.PayloadCompression, which can change the expected count if
compression is later introduced; update the test to explicitly set
cfg.PayloadCompression (or otherwise control compression) and add a short
comment explaining the expectation so future changes won't break the count —
locate the test function and the Config used (createDefaultConfig, Config,
PayloadCompression) and ensure PayloadCompression is set to a known value (e.g.,
disabled) before asserting Len(...)=3 and add a one-line comment describing why
3 options are expected.
In `@exporter/loadbalancingexporter/payload_codec_test.go`:
- Around line 36-48: Extend the TestQueuePayloadCodecDecodeRejectsInvalidPayload
test suite by adding new tests that exercise the version-check and codec-ID
error paths and edge payload sizes: use
newQueuePayloadCodec(QueuePayloadCompressionSnappy) and codec.Encode to produce
a valid encoded slice, then corrupt the version byte (e.g., encoded[3]=99) and
assert codec.Decode returns an error containing "unsupported version"; corrupt
the codec ID byte (e.g., encoded[4]=99) and assert the error contains
"unsupported codec"; add a test that truncates the encoded slice to remove the
payload after the header and assert Decode returns an error for empty payload;
and add a large payload test (e.g., large repeated byte slice), assert Encode
succeeds and that compressed length is smaller than original and that Decode
returns the original data.
ℹ️ Review info
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
exporter/loadbalancingexporter/go.sumis excluded by!**/*.sum
📒 Files selected for processing (7)
exporter/loadbalancingexporter/config.goexporter/loadbalancingexporter/config_test.goexporter/loadbalancingexporter/factory.goexporter/loadbalancingexporter/factory_test.goexporter/loadbalancingexporter/go.modexporter/loadbalancingexporter/payload_codec.goexporter/loadbalancingexporter/payload_codec_test.go
There was a problem hiding this comment.
cubic analysis
1 issue found across 8 files
Confidence score: 4/5
- Potential resource leak in
initZstd()ifzstd.NewReader(nil)fails afterzstd.NewWriter(nil)succeeds, leaving the encoder unclosed andsync.Onceprevents retry; moderate impact but likely rare. - Overall risk appears limited to error-path cleanup, so this feels safe to merge with minor follow-up.
- Pay close attention to
exporter/loadbalancingexporter/payload_codec.go- ensure encoder is closed on reader init failure.
Prompt for AI agents (unresolved issues)
Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.
<file name="exporter/loadbalancingexporter/payload_codec.go">
<violation number="1" location="exporter/loadbalancingexporter/payload_codec.go:104">
P2: Resource leak in `initZstd()`: if `zstd.NewReader(nil)` fails after `zstd.NewWriter(nil)` succeeds, the already-allocated encoder is never closed. Because `sync.Once` prevents re-entry, the orphaned encoder will persist for the lifetime of the process. Add cleanup of `zstdEnc` when reader creation fails.</violation>
</file>
Linked issue analysis
Linked issue: SAW-6556: Compressed Sending Queue + file_storage for Collector exporters
| Status | Acceptance criteria | Notes |
|---|---|---|
| ✅ | Feature off by default; no behavior change when disabled | Default config sets PayloadCompression to none |
| ✅ | Applies only to loadbalancingexporter | All changes are within loadbalancingexporter package |
| ✅ | Config option sending_queue.payload_compression added with values none|snappy|zstd and default none | Added QueuePayloadCompression type and constants plus QueueSettings |
| ✅ | Validation of config option allowed values | Validate() checks allowed values and returns error on invalid |
| ✅ | Queue payload codec implementation for snappy and zstd | Implemented codec with snappy and zstd compress/decompress |
| ✅ | Wire codec via exporterhelper queue payload codec hook | Factory appends WithQueueBatchPayloadCodec when enabled |
| ❌ | Ensure compatibility with existing queue semantics: ordering | No tests or code verifying preserved ordering |
| ❌ | Ensure compatibility with existing queue semantics: retry/backoff | No tests validating retry/backoff with compressed entries |
| ❌ | Ensure compatibility with existing queue semantics: shutdown/drain | No shutdown/drain behavior tests or handling changes shown |
| ❌ | Ensure compatibility with existing queue semantics: restart/recovery for persistent queue | No tests for persistent queue restart/recovery with compression |
| ✅ | Keep existing sending_queue.storage semantics (file_storage/), do not introduce memory|file_storage enum | No new storage enum introduced; storage left unchanged |
| ❌ | Add telemetry for enqueue/dequeue bytes (raw vs stored) | No telemetry metrics or instrumentation added in diffs |
| ❌ | Add telemetry for compression ratio | No telemetry for compression ratio present in changes |
| ❌ | Add telemetry for compression/decompression CPU time (or proxy) | No CPU-time or proxy telemetry added in code diffs |
| ❌ | Correctness tests pass for enqueue/dequeue integrity | Only codec roundtrip tests exist; no enqueue/dequeue queue tests |
| ❌ | Correctness tests pass for retry behavior with compressed entries | No tests exercising retry with compressed queue entries |
| ❌ | Correctness tests pass for persistent queue restart/recovery | No persistent queue restart/recovery tests included |
| ❌ | Benchmarks: 14k EPS (stable) | No benchmark artifacts or benchmark code present |
| ❌ | Benchmarks: 16k EPS (stress) | No benchmark artifacts or stress test code present |
| ❌ | Benchmark variant: memory queue + none | No benchmark variants implemented or recorded |
| ❌ | Benchmark variant: memory queue + snappy (and/or zstd) | No benchmark for memory+compression variants present |
| ❌ | Benchmark variant: file_storage queue + none | No file_storage benchmark variants present |
| ❌ | Benchmark variant: file_storage queue + compression | No file_storage+compression benchmarks present |
| ❌ | Benchmark report includes achieved EPS, p95/p99, error/fallback rates | No benchmark report or metrics documented in PR |
| ❌ | Benchmark report includes collector CPU and queue fill slope | No CPU/queue-fill metrics or report included |
| ❌ | Benchmark report includes RSS and disk usage | No RSS/disk usage measurements or reports provided |
| ❌ | Benchmark report includes codec break-even guidance | No analysis or break-even guidance included in PR |
| Tests + benchmark artifacts delivered | Codec/config tests added but benchmarks/artifacts missing | |
| ❌ | Tuning/failure-mode documentation for operators | No operator-facing docs or tuning guidance in diffs |
| ✅ | Implementation in LB exporter path (deliverable) | Code changes and codec live under loadbalancingexporter package |
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
There was a problem hiding this comment.
[ARCH-REVIEW] Re-review: ✅ APPROVE — all prior issues resolved, no new concerns.
Prior review had 1 concern and 2 nits:
✅ payload_codec.go:100 — Fixed: Close() method added with closeOnce + shutdownWithCodec() wires it into all three exporter shutdown paths via multierr.Append
✅ payload_codec_test.go:36 — Fixed: version-mismatch path now covered (encoded[3] = 0xFF)
🟢 go.mod:98 — x/net pseudo-version nit not re-flagged (advisory, one shot)
New code quality check: initZstd error handling, closeOnce idempotency, nil-guard on both encoder and decoder in Close(), and correct application of shutdownWithCodec across all signal types (traces, logs, metrics) — all solid. No new findings.
Unresolved threads: 0. Clean to merge.
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In @.chloggen/avoid-extra-copy.yaml:
- Line 5: The changelog entry currently uses "component: exporter/coralogix"
which is incorrect for this PR; replace that component value with one
appropriate to the loadbalancing exporter compression change (e.g., "component:
exporter/loadbalancing" or "component: exporter/queue-compression"), update the
note/subtext to describe "loadbalancing exporter queue payload compression"
(remove the PrivateLink mention), and set the issue reference to the
loadbalancing exporter compression issue/PR number so release notes will
describe the correct feature; specifically edit the "component:
exporter/coralogix" line and the accompanying note/subtext/issue fields in the
existing YAML entry.
In @.chloggen/upgrade-contrib-version.yaml:
- Around line 7-10: Change the broad changelog entries so they reflect the PR
feature: update the YAML key "component" from "all" to the specific scope (e.g.,
"loadbalancer" or "loadbalancing/compression") and replace the "note" text with
a concise description of the loadbalancing compression backport (mentioning any
dependency bumps only if they are directly part of that backport), ensuring the
note explicitly references the loadbalancing compression fix/backport rather
than a generic dependency-update message.
ℹ️ Review info
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
.chloggen/avoid-extra-copy.yaml.chloggen/config.yaml.chloggen/oracle_query_traceid_fix.yaml.chloggen/saw-6556-loadbalancing-compressed-queue.yaml.chloggen/upgrade-contrib-version.yaml
✅ Files skipped from review due to trivial changes (2)
- .chloggen/oracle_query_traceid_fix.yaml
- .chloggen/saw-6556-loadbalancing-compressed-queue.yaml
There was a problem hiding this comment.
[ARCH-REVIEW] Re-review: 💬 COMMENT — no new concerns, but 2 unresolved thread(s) remain.
The only new commit since approval is d7abd21c (docs: normalize chloggen component keys) — three renames (e.g., coralogixexporter → exporter/coralogix). No functional code changed. No new architectural findings.
However: 2 review threads are still unresolved. Cannot re-approve until all threads are resolved. Resolve them and this is clear to merge.
There was a problem hiding this comment.
[ARCH-REVIEW] Re-review: ✅ APPROVE — all threads resolved
All prior findings addressed:
payload_codec.go:100—Close()+ nil guard added ✅payload_codec.go:105—zstdEnccleanup onzstd.NewReaderfailure ✅payload_codec_test.go:36— version-mismatch decode test added ✅go.mod:193— pseudo-version is transitive (k8s.io/apimachinery → grpc chain), not manually pinned ✅
Zero new findings. Ready to merge.
|
Dependency update: this branch now pins exporterhelper to , which comes from collector PR Sawmills/opentelemetry-collector#6 (in-memory encoded queue support). |
|
Dependency update: this branch now pins exporterhelper to github.com/Sawmills/opentelemetry-collector/exporter/exporterhelper v0.140.1-0.20260303212412-92b47b11df4d, from collector PR Sawmills/opentelemetry-collector#6 (in-memory encoded queue support). |
There was a problem hiding this comment.
[ARCH-REVIEW] Re-review: ✅ APPROVE — prior issues resolved, new changes clean
New commits add compress_in_memory opt-in for in-memory queue compression. Validation guards are correct (requires enabled=true and a non-none codec). Factory wiring is conditional on payloadCodec != nil. Tests cover both the validation path and the option-count assertion. go.mod bump is routine. One nit flagged.
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (1)
exporter/loadbalancingexporter/factory_test.go (1)
204-253: Prefer behavior-oriented assertions over raw option-count assertions.These tests currently depend on exact option counts, which can become fragile when resilience options evolve. Consider asserting presence/absence of specific behaviors per scenario instead of strict lengths.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@exporter/loadbalancingexporter/factory_test.go` around lines 204 - 253, Replace brittle length-based assertions in TestBuildExporterResilienceOptions by checking for specific behaviors/options returned by buildExporterResilienceOptions instead of exact counts: call buildExporterResilienceOptions(o, cfg, codec) and assert the presence or absence of concrete options (e.g., timeout option, queue/queue-batching option, payload-compression option, in-memory compression option, backoff option) by examining the returned slice for expected exporterhelper.Option types or their effects (wrap/unary funcs or known exported helpers), using identifiers like buildExporterResilienceOptions, newQueuePayloadCodec, Config.TimeoutSettings, Config.QueueSettings.PayloadCompression, Config.QueueSettings.CompressInMemory and Config.BackOffConfig to locate and adapt each sub-test to assert the actual behavior rather than assert.Len.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@exporter/loadbalancingexporter/factory.go`:
- Around line 171-179: The function newQueuePayloadCodecIfEnabled currently
treats both the empty-string legacy value and the explicit "none"
(QueuePayloadCompressionNone) the same, causing the queue codec to be disabled
and breaking decoding of previously compressed payloads; change the logic so
that only an empty string returns nil (legacy behavior) while an explicit
QueuePayloadCompressionNone still calls
newQueuePayloadCodec(QueuePayloadCompressionNone) and returns a codec instance
(so WithQueueBatchPayloadCodec is wired and can decode older entries); apply the
same change to the other analogous block referenced in the comment (the earlier
82-93 region) to ensure consistent behavior.
In `@exporter/loadbalancingexporter/go.mod`:
- Line 221: The go.mod replace pins exporterhelper to commit 92b47b11df4d (which
adds WithQueueBatchInMemoryEncoding) but the loadbalancingexporter backport
requires the codec hook WithQueueBatchPayloadCodec introduced in commit
8ffd61d257fd; verify whether commit 92b47b11df4d actually contains
WithQueueBatchPayloadCodec and, if not, update the replace to point to
8ffd61d257fd (or a commit that includes both features) so the
loadbalancingexporter can use WithQueueBatchPayloadCodec; ensure references to
WithQueueBatchInMemoryEncoding and WithQueueBatchPayloadCodec and the
loadbalancingexporter integration compile against the chosen fork commit.
---
Nitpick comments:
In `@exporter/loadbalancingexporter/factory_test.go`:
- Around line 204-253: Replace brittle length-based assertions in
TestBuildExporterResilienceOptions by checking for specific behaviors/options
returned by buildExporterResilienceOptions instead of exact counts: call
buildExporterResilienceOptions(o, cfg, codec) and assert the presence or absence
of concrete options (e.g., timeout option, queue/queue-batching option,
payload-compression option, in-memory compression option, backoff option) by
examining the returned slice for expected exporterhelper.Option types or their
effects (wrap/unary funcs or known exported helpers), using identifiers like
buildExporterResilienceOptions, newQueuePayloadCodec, Config.TimeoutSettings,
Config.QueueSettings.PayloadCompression, Config.QueueSettings.CompressInMemory
and Config.BackOffConfig to locate and adapt each sub-test to assert the actual
behavior rather than assert.Len.
ℹ️ Review info
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
exporter/loadbalancingexporter/go.sumis excluded by!**/*.sum
📒 Files selected for processing (7)
.chloggen/saw-6556-loadbalancing-compressed-queue.yamlexporter/loadbalancingexporter/README.mdexporter/loadbalancingexporter/config.goexporter/loadbalancingexporter/config_test.goexporter/loadbalancingexporter/factory.goexporter/loadbalancingexporter/factory_test.goexporter/loadbalancingexporter/go.mod
✅ Files skipped from review due to trivial changes (1)
- exporter/loadbalancingexporter/README.md
🚧 Files skipped from review as they are similar to previous changes (1)
- .chloggen/saw-6556-loadbalancing-compressed-queue.yaml
There was a problem hiding this comment.
🧹 Nitpick comments (1)
exporter/loadbalancingexporter/factory_test.go (1)
221-223: Make queue enablement explicit in these resilience tests.These assertions currently depend on
exporterhelper.NewDefaultQueueConfig()defaultingEnabled=true. Setcfg.QueueSettings.Enabled = trueexplicitly in each queue-path test to avoid brittle behavior if upstream defaults change.Proposed test-hardening diff
@@ cfg.TimeoutSettings = exporterhelper.NewDefaultTimeoutConfig() cfg.QueueSettings.QueueBatchConfig = exporterhelper.NewDefaultQueueConfig() + cfg.QueueSettings.Enabled = true assert.Len(t, buildExporterResilienceOptions(o, cfg, newQueuePayloadCodecIfEnabled(cfg)), 3) @@ cfg.TimeoutSettings = exporterhelper.NewDefaultTimeoutConfig() cfg.QueueSettings.QueueBatchConfig = exporterhelper.NewDefaultQueueConfig() + cfg.QueueSettings.Enabled = true cfg.QueueSettings.PayloadCompression = QueuePayloadCompressionSnappy @@ cfg.TimeoutSettings = exporterhelper.NewDefaultTimeoutConfig() cfg.QueueSettings.QueueBatchConfig = exporterhelper.NewDefaultQueueConfig() + cfg.QueueSettings.Enabled = true cfg.QueueSettings.PayloadCompression = QueuePayloadCompressionSnappy cfg.QueueSettings.CompressInMemory = true @@ cfg.TimeoutSettings = exporterhelper.NewDefaultTimeoutConfig() cfg.QueueSettings.QueueBatchConfig = exporterhelper.NewDefaultQueueConfig() + cfg.QueueSettings.Enabled = true cfg.QueueSettings.PayloadCompression = QueuePayloadCompressionNone cfg.BackOffConfig = configretry.NewDefaultBackOffConfig()Also applies to: 229-233, 238-243, 248-252
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@exporter/loadbalancingexporter/factory_test.go` around lines 221 - 223, The tests rely on exporterhelper.NewDefaultQueueConfig() implicitly enabling the queue; make the queue enablement explicit by setting cfg.QueueSettings.Enabled = true in each resilience test that exercises the queue path (those calling buildExporterResilienceOptions and newQueuePayloadCodecIfEnabled). Locate the test cases that set cfg.QueueSettings.QueueBatchConfig = exporterhelper.NewDefaultQueueConfig() (and the other instances at the noted ranges) and add cfg.QueueSettings.Enabled = true immediately after creating the default queue config so the assertions remain stable if upstream defaults change.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@exporter/loadbalancingexporter/factory_test.go`:
- Around line 221-223: The tests rely on exporterhelper.NewDefaultQueueConfig()
implicitly enabling the queue; make the queue enablement explicit by setting
cfg.QueueSettings.Enabled = true in each resilience test that exercises the
queue path (those calling buildExporterResilienceOptions and
newQueuePayloadCodecIfEnabled). Locate the test cases that set
cfg.QueueSettings.QueueBatchConfig = exporterhelper.NewDefaultQueueConfig() (and
the other instances at the noted ranges) and add cfg.QueueSettings.Enabled =
true immediately after creating the default queue config so the assertions
remain stable if upstream defaults change.
ℹ️ Review info
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
exporter/loadbalancingexporter/factory.goexporter/loadbalancingexporter/factory_test.goexporter/loadbalancingexporter/payload_codec.goexporter/loadbalancingexporter/payload_codec_test.go
🚧 Files skipped from review as they are similar to previous changes (2)
- exporter/loadbalancingexporter/payload_codec.go
- exporter/loadbalancingexporter/payload_codec_test.go
Summary
sending_queue.payload_compressionto loadbalancing exporternone(default),snappy,zstdDependency
8ffd61d257fdValidation
Note
Medium Risk
Adds new queue encoding/decoding behavior (including a custom
exporterhelperreplace) on the loadbalancing exporter’s persistence/in-memory queue path, which could impact reliability and upgrade compatibility if misconfigured or if decoding fails.Overview
Adds optional payload compression to
loadbalancingexporter’ssending_queuevia new config fieldspayload_compression(none/snappy/zstd) andcompress_in_memory, including validation and README docs.Implements a versioned queue payload codec (magic header + version + codec id) and wires it into exporter creation via
exporterhelper.WithQueueBatchPayloadCodec, enabling persistent-queue encoding and optional in-memory encoded storage; codec resources (zstd) are closed on exporter shutdown.Updates unit tests for config/codec behavior, adjusts module deps to include snappy/zstd and pins
exporterhelpervia areplace, and updates changelog configuration/entries (including component key normalization and a new SAW-6556 release note).Written by Cursor Bugbot for commit 86cdc59. This will update automatically on new commits. Configure here.
Summary by cubic
Adds optional compression for loadbalancingexporter sending_queue payloads, with an opt-in in-memory mode, to reduce RAM/disk use during backpressure. Scoped to SAW-6556 (LB-only, file_storage-compatible) with safe restarts and proper resource cleanup.
New Features
Bug Fixes
Written for commit 86cdc59. Summary will update on new commits.
Summary by CodeRabbit
Greptile Summary
Backports queue payload compression feature to loadbalancing exporter, supporting snappy and zstd compression with a versioned codec format (magic header 'sqc' + version + codec ID).
queuePayloadCodecwith lazy zstd initialization viasync.OnceQueueSettingswrapper withPayloadCompressionfield and validationexporterhelper.WithQueueBatchPayloadCodec()when compression is enabled8ffd61d257fdfor codec hook supportnone, accepts empty string in validationConfidence Score: 5/5
Important Files Changed
QueueSettingswrapper withPayloadCompressionfield and validationWithQueueBatchPayloadCodecFlowchart
%%{init: {'theme': 'neutral'}}%% flowchart TD A[Config: sending_queue.payload_compression] -->|none/empty| B[No codec applied] A -->|snappy/zstd| C[Create queuePayloadCodec] C --> D[Encode payload] D --> E[Add magic header 'sqc'] E --> F[Add version byte 1] F --> G[Add codec ID 1=snappy, 2=zstd] G --> H[Append compressed data] H --> I[Store in queue] I --> J[Decode from queue] J --> K{Validate header} K -->|Invalid| L[Error: invalid payload] K -->|Valid| M[Extract codec ID from payload] M --> N{Codec ID} N -->|1| O[snappy.Decode] N -->|2| P[zstd.DecodeAll] O --> Q[Return decompressed data] P --> QLast reviewed commit: 6b848ab