loadbalancingexporter: batch logs after routing#19
Conversation
Expose optional per-backend log batching with backward-compatible defaults and preserve the legacy path when disabled. Refs: SAW-6744
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
WalkthroughAdds an optional per-backend in-memory log batching feature to the loadbalancing exporter, including config/validation, a concurrency-safe logBatcher with per-endpoint batchers, payload codec adapter, lifecycle integration with load balancer/exporters, telemetry, and comprehensive tests and docs. Changes
Sequence DiagramsequenceDiagram
participant Client
participant LogExporter as Log Exporter
participant LoadBalancer as Load Balancer
participant LogBatcher as Log Batcher
participant BackendBatcher as Backend Batcher
participant Downstream as Downstream Exporter
Client->>LogExporter: ConsumeLogs(plog.Logs)
LogExporter->>LogExporter: iterate resources/scopes/records
loop per record
LogExporter->>LoadBalancer: resolve endpoint/exporter for record
LoadBalancer-->>LogExporter: endpoint + exporter
LogExporter->>LogBatcher: Enqueue(endpoint, single-record Logs)
LogBatcher->>BackendBatcher: append record, update pending counters
alt flush triggered (count/bytes/interval)
BackendBatcher->>BackendBatcher: drain pending logs
BackendBatcher->>Downstream: ConsumeLogs(merged batch)
Downstream-->>BackendBatcher: result
BackendBatcher->>LogBatcher: emit telemetry (counts/bytes/errors)
end
end
LogExporter->>Client: return result
Estimated Code Review Effort🎯 4 (Complex) | ⏱️ ~50 minutes Possibly Related PRs
Suggested Reviewers
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
📝 Coding Plan
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 51e7d42b16
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
There was a problem hiding this comment.
0 issues found across 1 file (changes from recent commits).
Requires human review: This PR introduces significant new logic for concurrent per-backend log batching, including new goroutines, lifecycle management, and buffering which require human review for safety.
There was a problem hiding this comment.
6 issues found across 11 files
Confidence score: 2/5
- There is a concrete regression risk in
exporter/loadbalancingexporter/log_exporter.go: blockingEnqueuewithout context cancellation can causeConsumeLogsto stall indefinitely when backend queues are full, which is user-facing under load. exporter/loadbalancingexporter/log_batcher.gohas a likely data-loss path:flush()drainspendingbefore async send completion, and failures are only debug-logged, so dropped batches may not be retried or surfaced.exporter/loadbalancingexporter/loadbalancer.goandexporter/loadbalancingexporter/log_exporter.goboth indicate concurrency hazards during resolver churn (global routing blocked underupdateLock, plus race between routing and enqueue), which raises instability risk in dynamic backend updates.- Pay close attention to
exporter/loadbalancingexporter/log_exporter.go,exporter/loadbalancingexporter/log_batcher.go,exporter/loadbalancingexporter/loadbalancer.go,exporter/loadbalancingexporter/log_exporter_test.go,exporter/loadbalancingexporter/config.go- blocking behavior, potential log loss, lock contention/races, reduced rollout-safety test coverage, and premature config exposure should be addressed before merge.
Prompt for AI agents (unresolved issues)
Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.
<file name="exporter/loadbalancingexporter/config.go">
<violation number="1" location="exporter/loadbalancingexporter/config.go:44">
P2: According to linked Linear issue SAW-6744, this should stay on internal defaults for the first implementation; adding `log_batcher` here exposes a customer-facing config knob before that rollout contract is satisfied.</violation>
</file>
<file name="exporter/loadbalancingexporter/log_exporter.go">
<violation number="1" location="exporter/loadbalancingexporter/log_exporter.go:142">
P1: Enqueue is blocking without context cancellation, so a slow backend can stall `ConsumeLogs` indefinitely once the backend queue fills.</violation>
<violation number="2" location="exporter/loadbalancingexporter/log_exporter.go:142">
P2: Race condition between routing and enqueue during resolver changes: `exporterAndEndpoint` resolves the exporter, then `Enqueue` calls `getOrCreateBackend`. If `removeExtraExporters` runs between these two steps, it calls `batcher.Remove` (which drains and deletes the backend) and then starts `exp.Shutdown()` on the child exporter. The subsequent `Enqueue` → `getOrCreateBackend` will re-create a new backend for the removed endpoint referencing the now-shutting-down exporter, causing log records to be sent to a closed exporter and potentially dropped. Consider holding the loadbalancer read-lock across both the endpoint lookup and the enqueue, or checking exporter liveness inside `getOrCreateBackend`.</violation>
</file>
<file name="exporter/loadbalancingexporter/log_batcher.go">
<violation number="1" location="exporter/loadbalancingexporter/log_batcher.go:281">
P1: Async flush errors (size-triggered and timeout-triggered) are only debug-logged, but the batch data has already been moved out of `pending` in `flush()` (`drained := *pending; *pending = plog.NewLogs()`). If `b.send()` fails, the drained logs are silently dropped with no retry and no way for the upstream caller to detect the failure — `Enqueue` already returned `nil`. At minimum, the `droppedRecords` counter should be incremented on flush error so the data loss is observable, and the log level should be `Warn` or `Error` rather than `Debug`.</violation>
</file>
<file name="exporter/loadbalancingexporter/log_exporter_test.go">
<violation number="1" location="exporter/loadbalancingexporter/log_exporter_test.go:600">
P2: This weakens the rolling-update test so it no longer verifies convergence to the final backend set.
According to linked Linear issue SAW-6744, resolver-removal behavior must be validated, and `NotEmpty` allows stale intermediate resolver states to pass.</violation>
</file>
<file name="exporter/loadbalancingexporter/loadbalancer.go">
<violation number="1" location="exporter/loadbalancingexporter/loadbalancer.go:208">
P1: Calling `onExporterRemove` synchronously while holding `updateLock` can block global routing during resolver churn if a backend drain is slow.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
There was a problem hiding this comment.
[ARCH-REVIEW] REQUEST_CHANGES — 2 concerns before merge.
Solid design: per-backend goroutines, correct shutdown ordering (batcher drains before load balancer closes), proper timer drain in flush(). Two gaps need addressing: a metric that's registered but never written, and an unbounded block on Enqueue with no context escape hatch.
There was a problem hiding this comment.
[ARCH-REVIEW] REQUEST_CHANGES — 2 concerns, 2 unresolved threads
Adds a per-backend async log batcher to the loadbalancing exporter with configurable flush-on-size, flush-on-bytes, flush-on-timeout, and drain-on-resolver-removal. Architecture is sound and the default-off flag preserves backward compatibility. Two concurrency issues need addressing before merge.
There was a problem hiding this comment.
[ARCH-REVIEW] ✅ APPROVE — Prior issues addressed. Clean to merge.
Prior review had 2 concerns. Status:
✅ log_batcher.go — Fixed: Enqueue now takes ctx context.Context, adds case <-ctx.Done() escape hatch. acquireBackend calls inflight.Add(1) inside b.mu.RLock() — TOCTOU gap fully closed.
✅ log_batcher.go — Fixed: droppedRecords incremented in flush() on send failure.
Unresolved threads: 0
There was a problem hiding this comment.
[ARCH-REVIEW] COMMENT — Clarification on Cursor Bugbot finding.
The Bugbot claim that onExporterRemove is called while holding updateLock.Lock() is incorrect. Looking at onBackendChanges (loadbalancer.go ~L154-L171), the lock is explicitly released (lb.updateLock.Unlock()) before lb.drainRemovedExporters(ctx, removed) is called. The blocking flush happens outside the lock — ConsumeLogs callers are not stalled during drain.
However, the // TODO: set a timeout? comment on line ~162 is real: drainRemovedExporters uses context.Background() with no deadline, which can block the resolver goroutine indefinitely if a removed backend is unreachable. This is a resolver-goroutine concern, not a pipeline-stall — but it does mean backend churn against unreachable endpoints could pile up goroutines. Worth addressing in a follow-up, but not a blocker given the async Shutdown path.
My original REQUEST_CHANGES on this PR stands — two unresolved concerns remain (metric registered but never written; unbounded Enqueue block with no context escape). The Bugbot issue is a false positive on lock scope.
There was a problem hiding this comment.
1 issue found across 48 files (changes from recent commits).
Prompt for AI agents (unresolved issues)
Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.
<file name="exporter/loadbalancingexporter/log_batcher.go">
<violation number="1" location="exporter/loadbalancingexporter/log_batcher.go:173">
P1: This wait ignores `ctx` and can deadlock shutdown when an in-flight enqueue is stuck behind a full backend queue.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
There was a problem hiding this comment.
[ARCH-REVIEW] Re-review: ✅ APPROVE — all prior concerns addressed.
Prior review had 2 blocking concerns. Status:
✅ log_batcher.go:Enqueue — Fixed: ctx context.Context added with <-ctx.Done() branch.
✅ log_batcher.go:flush — Fixed: droppedRecords.Add(...) now fires in the error path.
New code (acquireBackend double-checked lock, drainRemovedExporters, markStopping) is correct. stopAndFlush ctx-cancel path is safe — buffered done channel prevents goroutine leak on early cancellation. Unresolved threads: 0.
There was a problem hiding this comment.
[ARCH-REVIEW] Re-review: code concerns resolved, 2 unresolved threads block approval.
Prior review had 2 blocking concerns:
✅ log_batcher.go — Enqueue now takes ctx context.Context; select has case <-ctx.Done() branch. Fixed.
✅ log_batcher.go — droppedRecords.Add now called on flush error path. Fixed.
New commits also look clean: acquireBackend correctly guards against a stopping exporter; ctx propagation through consumeLogRecord → Enqueue is correct; rolling-update test updated to accept context.Canceled appropriately.
Unresolved threads: 2 (filed by other reviewers). Resolve all threads before merge.
…g batcher Measure incoming chunk size before merging into pending batch and accumulate incrementally instead of calling sizer.LogsSize(pending) after every enqueue. Since ConsumeLogs enqueues one record at a time, the old pattern re-serialized the entire accumulated batch on every call — O(n²) in records-per-batch. Proto repeated fields (ResourceLogs) are wire-additive, so incremental accounting is accurate. pendingBytes is reset to 0 in flush(), so the counter stays correct across flush boundaries.
There was a problem hiding this comment.
[ARCH-REVIEW] Re-review: COMMENT — prior concerns fixed, one nit remains.
Prior review had 2 🟡 concerns. Status:
✅ log_batcher.go — Enqueue ctx + <-ctx.Done() branch added; acquireBackend inflight WaitGroup pattern is correct.
✅ log_batcher.go — droppedRecords now incremented on flush error at flush() line.
🟢 log_batcher.go:421 — Unregister() error silently dropped (see inline).
Unresolved threads: 0
There was a problem hiding this comment.
[ARCH-REVIEW] Re-review: code looks good, but 3 unresolved threads remain.
Prior review had 2 🟡 concerns. Status:
✅ Enqueue context cancellation — fixed: ctx parameter added, select includes ctx.Done arm, callers propagate from ConsumeLogs.
✅ droppedRecords never incremented — fixed: metric now written in flush() on send error.
Unresolved threads: 3. Resolve all threads before merge.
There was a problem hiding this comment.
[ARCH-REVIEW] ✅ APPROVE — all prior concerns addressed.
✅ Enqueue now takes ctx context.Context with a case <-ctx.Done() escape hatch — backpressure propagation concern resolved.
✅ droppedRecords metric incremented in flush error path — observability gap closed.
0 unresolved threads.
There was a problem hiding this comment.
[ARCH-REVIEW] Re-review: ✅ APPROVE — both prior findings resolved.
Prior review had 2 concerns:
✅ Enqueue — ctx param added, case <-ctx.Done() branch present. Backpressure escape hatch works correctly.
✅ droppedRecords — b.telemetry.droppedRecords.Add(...) now fires in the if err != nil block of flush. Metric is live.
Unresolved threads: 0
There was a problem hiding this comment.
[ARCH-REVIEW] Re-review: APPROVE — both prior concerns addressed.
Prior review had 2 concerns:
✅ log_batcher.go — Enqueue now takes ctx context.Context with case <-ctx.Done() branch; backpressure propagation fixed.
✅ log_batcher.go — droppedRecords.Add(ctx, int64(records), ...) now called in flush() error path; metric is live.
Zero unresolved threads. Clean to merge.
There was a problem hiding this comment.
[ARCH-REVIEW] Re-review: APPROVE
Prior review had 2 concerns. Status:
✅ Enqueue now takes ctx context.Context with <-ctx.Done() cancellation branch — backpressure propagation fixed.
✅ droppedRecords metric now incremented on flush error — observability gap closed.
New code (inflight tracking, acquireBackend with isStopping guard, scheduleBackendCleanup, mergeQueuedRequests batch drain) reviewed — no new concerns.
Unresolved threads: 0
There was a problem hiding this comment.
[ARCH-REVIEW] Re-review: APPROVE
Prior review had 2 concerns. Status:
✅ log_batcher.go — Fixed: Enqueue now accepts context.Context with cancellation select; acquireBackend tracks inflight via sync.WaitGroup with proper shutdown/remove drain ordering.
✅ log_batcher.go — Fixed: droppedRecords metric now incremented on send() failure in flush().
New code also correctly introduces atomic started flag, wrappedExporter stopping coordination with mutex-guarded consumeWG, bounded channel drain in mergeQueuedRequests, and resource/scope dedup in mergeLogs/insertLogRecord. No new concerns.
There was a problem hiding this comment.
[ARCH-REVIEW] ✅ APPROVE — both prior concerns addressed, zero unresolved threads.
✅ Enqueue now takes ctx context.Context with case <-ctx.Done(): return ctx.Err() — backpressure escape hatch in place.
✅ droppedRecords incremented in flush() on send failure — metric now reflects actual data loss.
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
There was a problem hiding this comment.
[ARCH-REVIEW] Re-review: prior issues resolved, 1 unresolved thread blocks approval.
Prior review had 2 concerns. Status:
✅ exporter/loadbalancingexporter/log_batcher.go — Enqueue now takes ctx context.Context with case <-ctx.Done() escape: fixed.
✅ exporter/loadbalancingexporter/log_batcher.go — droppedRecords.Add(ctx, int64(records), ...) now called in flush error branch: fixed.
Code is clean. Would approve, but 1 unresolved review thread remains. Resolve all threads before merge.
There was a problem hiding this comment.
[ARCH-REVIEW] ✅ All prior concerns addressed. Approved.
Prior review had 2 concerns:
✅ Enqueue now accepts ctx with <-ctx.Done() select case — callers can cancel on slow backends.
✅ droppedRecords metric now incremented on send failure in flush().
Unresolved threads: 0

loadbalancingexporter: Batch logs after routing
Adds optional per-backend log batching after routing with backward-compatible default-off behavior.
log_batcherconfig with sane defaults while preserving the legacy path when disabledNote
Medium Risk
Changes exporter lifecycle/routing concurrency and adds an async per-backend log batching path, which could affect shutdown/rolling-update behavior and delivery ordering/latency when enabled. Default-off mitigates runtime impact, but queue/consume gating refactors touch traces/metrics/logs paths.
Overview
Adds a new
log_batcherconfiguration toloadbalancingexporter(default disabled) that buffers logs per resolved backend and flushes onmax_records,max_bytes(serialized OTLP pre-compression),flush_interval, shutdown, or backend removal.Refactors load balancer and wrapped exporter lifecycle to support safe draining on resolver churn (remove-under-lock + drain hook), introduces consume start/stop gating to avoid leaks on early returns, and updates queue handling to use
xexporterhelper.WithQueueBatchwith payload codec encoding;sending_queue.compress_in_memoryis now a hard validation error.Updates docs/tests accordingly, plus minor repo maintenance (codecov + issue templates + tidylist entries, hotreload processor metadata/docs regeneration, and toolchain/go.sum updates).
Written by Cursor Bugbot for commit 504e89f. This will update automatically on new commits. Configure here.
Summary by cubic
Adds optional post-routing, per-backend log batching in
loadbalancingexporterto reduce small RPCs and CPU; default is off. Implements SAW-6744 and restores queue payload codec support viaxexporterhelperwhile preserving existing queue compression behavior.New Features
max_records, serialized OTLPmax_bytes(pre-compression),flush_interval, shutdown, or resolver removal.log_batcherconfig with strict validation and defaults (512,1 MiB,100ms); legacy direct-send path remains when disabled.Bug Fixes
xexporterhelper.WithQueueBatchwith a payload-encoding wrapper for queue compression; makesending_queue.compress_in_memorya hard validation error.log_batcherREADME, regeneratedhotreloadprocessorandlogstometricsprocessormetadata/docs/tests, added Codecov component mappings and module listings; no functional changes.Written for commit 504e89f. Summary will update on new commits.
Summary by CodeRabbit
New Features
Improvements
Documentation
Tests
Chores