Skip to content

feat(awss3exporter): port legacy fork capabilities (SAW-7101)#46

Merged
ishaish103 merged 7 commits into
mainfrom
fix/SAW-7101-port-legacy-awss3-capabilities
Apr 17, 2026
Merged

feat(awss3exporter): port legacy fork capabilities (SAW-7101)#46
ishaish103 merged 7 commits into
mainfrom
fix/SAW-7101-port-legacy-awss3-capabilities

Conversation

@ishaish103

@ishaish103 ishaish103 commented Apr 17, 2026

Copy link
Copy Markdown

Summary

Ports 4 missing capabilities from the old sawmills-collector local awss3exporter fork, fixing a production issue where insights stopped working and an S3 cost regression.

Gap 1: Signal type in S3 path (fixes insights)

  • {prefix}/logs/year=2026/... instead of {prefix}/year=2026/.../logs_file

Gap 2: Timer-based batching

  • Flushes at minute/hour boundaries → one file per window, reduces S3 PUT costs

Gap 3: jsonBatchingMarshaler

  • JSONL batching with configurable max file size (max_file_size_bytes)

Gap 4: Flush interfaces

  • logFlusher/traceFlusher for marshalers and encoding extensions

All gated behind legacy config. Standard contrib paths unaffected.

  • Tests pass, build clean, vet clean
  • Local MinIO test verified /logs/ in S3 path

Refs: SAW-7101, SAW-7102

🤖 Generated with Claude Code


Summary by cubic

Ports legacy fork capabilities into awss3exporter to restore Insights S3 routing and reduce S3 PUTs. Fixes SAW-7101 with {prefix}/{signal}/... keys, default JSONL batching for otlp_json (256MB), and timer-aligned flushes matching the old fork.

  • New Features

    • Adds {signal} S3 path segment: {prefix}/logs|metrics|traces/year=... (restores Insights routing).
    • Optional metadata segment from resource attrs before time folders.
    • JSONL batching for logs/traces with size trigger (max_file_size_bytes, default 256 MiB); optional timer flush at minute/hour boundaries when s3_partition is set; default S3 partition format is hour-level.
  • Bug Fixes

    • Timer uses s3_partition_timezone (defaults to Local) and advances from the previous boundary to avoid drift.
    • Flushes pending batches on shutdown; guarded by sync.Once.
    • Trace batching outputs one OTLP JSON document per line (per resourceSpan); new flush interfaces pass timer/shutdown reasons to marshalers and extensions.

Written for commit e739444. Summary will update on new commits.

Summary by CodeRabbit

  • New Features

    • Configurable maximum file size threshold for S3 batch uploads.
    • JSON batching for logs and traces to reduce upload frequency (with automatic flush when size exceeded).
    • Automatic periodic flushing of batches aligned to minute/hour boundaries.
  • Improvements

    • Exporter lifecycle: explicit shutdown hooks and final flush on shutdown.
    • S3 key structure updated to include metadata as an additional partition segment.
  • Tests

    • Added tests validating JSON batching and flush behavior for logs and traces.

Port 4 missing capabilities from the old sawmills-collector local fork
to the contrib fork, fixing insights and S3 cost regression:

1. Signal type (logs/metrics/traces) as S3 path segment — the insights
   engine expects {prefix}/{signal}/{time}/{file}. Previously the signal
   was only in the filename.

2. Timer-based batching (runEvery) — accumulates data and flushes at
   minute/hour partition boundaries, producing one large file per window
   instead of many small files. Reduces S3 PUT costs.

3. jsonBatchingMarshaler — batches log/trace records into JSONL format
   with configurable max file size (max_file_size_bytes). Uses Lock
   instead of TryLock for flush safety.

4. Flush interfaces (logFlusher/traceFlusher) — allows marshalers and
   encoding extensions to support periodic flush via type assertion.

All new behavior is gated behind legacy config (s3_partition, max_file_size_bytes).
Standard contrib code paths are unaffected.

Refs: SAW-7101, SAW-7102

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

@sawmills-staging sawmills-staging Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Advisory: [LOW] flushLogBatch() and flushTraceBatch() each emit a Debug log on every invocation. Both are called from two paths: (1) inline inside MarshalLogs/MarshalTraces whenever uncompressedSize >= maxBatchSize — firing on every size-threshold crossing during hot-path ingestion — and (2) from the timer goroutine (runEvery → flushMarshaler → FlushLogs/FlushTraces) once per minute or hour. The fields logged (batch_count, uncompressed_size) duplicate information already observable at the upload layer (S3 object key, upload success/error). When an operator enables debug logging to diagnose a real issue, these two calls become the dominant log stream, masking more actionable signals. The org pipeline routes logs through an awsS3 destination, so any debug-level log drain that reaches a log-indexing destination (datadog or otlpHttp destinations in the same pipeline) would also inflate ingestion volume.
Action: Remove both Debug calls from the internal flushLogBatch() and flushTraceBatch() helpers. If flush confirmation is needed for debugging, emit a single Debug log at the FlushLogs()/FlushTraces() public entry points (timer and shutdown paths only), not inside the internal helpers that are also called on every size-threshold crossing. Alternatively, add a flush counter metric (e.g. flush_count{reason='size_threshold|timer|shutdown'}) so flush frequency is observable without per-flush log noise.
Location: exporter/awss3exporter/json_batching_marshaler.go:100.

@coderabbitai

coderabbitai Bot commented Apr 17, 2026

Copy link
Copy Markdown

Walkthrough

Adds an optional JSON batching marshaler triggered by a new MaxFileSizeBytes config, periodic and shutdown-triggered flush logic, marshaler flush hooks, and changes S3 partition key construction to include metadata as a path segment.

Changes

Cohort / File(s) Summary
Configuration & Factory
exporter/awss3exporter/config.go, exporter/awss3exporter/factory.go
Added MaxFileSizeBytes config field (max_file_size_bytes) and registered exporter shutdown hooks via exporterhelper.WithShutdown.
JSON Batching Marshaler
exporter/awss3exporter/json_batching_marshaler.go, exporter/awss3exporter/json_batching_marshaler_test.go
New jsonBatchingMarshaler type with in-memory batching for logs/traces, size-triggered and explicit flush APIs; tests validating batching, JSONL output, and auto-flush on size.
Marshaler Framework & S3 Marshaler
exporter/awss3exporter/marshaler.go, exporter/awss3exporter/s3_marshaler.go
Added optional logFlusher/traceFlusher interfaces, newMarshalerWithConfig to create batching JSON marshaler when configured, and FlushLogs/FlushTraces delegation on s3Marshaler.
Exporter Lifecycle & Flush Logic
exporter/awss3exporter/exporter.go
Introduced done channel, shutdown method to trigger final flush, periodic timer goroutine that triggers flushes aligned to minute/hour boundaries, and flushMarshaler helper that uploads non-empty flush payloads with metadata.
Partition Key & Writer Tests
exporter/awss3exporter/internal/upload/partition.go, exporter/awss3exporter/internal/upload/partition_test.go, exporter/awss3exporter/internal/upload/writer_test.go
Changed partition key prefix to insert Metadata as a dedicated path segment; updated tests and expected object key assertions (including added noop partition segment in some cases).

Sequence Diagram

sequenceDiagram
    participant Client as Client
    participant Exporter as S3 Exporter
    participant Marshaler as JSON Batching Marshaler
    participant Buffer as In-Memory Buffer
    participant S3 as S3 Uploader
    participant Timer as Periodic Timer

    Client->>Exporter: Send telemetry (logs/traces)
    Exporter->>Marshaler: MarshalLogs/MarshalTraces
    Marshaler->>Marshaler: JSON encode entry
    Marshaler->>Buffer: Append entry (protected by mutex)
    Marshaler->>Marshaler: Update size counter
    alt size >= MaxFileSizeBytes
        Marshaler->>Marshaler: flush batch -> produce JSONL
        Marshaler->>Exporter: return payload for upload
        Exporter->>S3: Upload(payload, flushMetadata{reason:"size"})
        S3-->>Exporter: upload result
    else buffered (no immediate upload)
        Marshaler-->>Exporter: nil (buffered)
    end

    Timer->>Exporter: tick (aligned minute/hour)
    Exporter->>Marshaler: FlushLogs/FlushTraces
    Marshaler->>Marshaler: produce JSONL if any
    Marshaler->>Exporter: payload
    Exporter->>S3: Upload(payload, flushMetadata{reason:"timer"})
    S3-->>Exporter: upload result

    Client->>Exporter: Shutdown
    Exporter->>Exporter: close done
    Exporter->>Marshaler: FlushLogs/FlushTraces (reason: "shutdown")
    Marshaler->>Exporter: final payload(s)
    Exporter->>S3: Upload(final payloads, flushMetadata{reason:"shutdown"})
    S3-->>Exporter: upload result
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Suggested reviewers

  • amir-jakoby
  • sawmills-architect-review
  • cubic-dev-ai

Poem

🐰
I nibble bytes into a pile so neat,
JSON lines that dance and meet,
A timer taps, a final shove—
To S3 they hop, all snug and glove,
Metadata marks the trail I beat!

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly and specifically summarizes the main change: porting legacy fork capabilities to the awss3exporter to address production issues (SAW-7101).
Description check ✅ Passed PR description includes all required sections: summary with business context, technical details of changes, testing/verification, and issue references.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch fix/SAW-7101-port-legacy-awss3-capabilities

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@sawmills-architect-review sawmills-architect-review Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[ARCH-REVIEW] REQUEST_CHANGES

Ports the awss3exporter legacy batching and partition changes, but two regressions block merge: buffered flushes lose resource-based S3 routing, and the timer loop can skip partition boundaries when a flush runs long.

Comment thread exporter/awss3exporter/exporter.go
Comment thread exporter/awss3exporter/exporter.go Outdated

@cubic-dev-ai cubic-dev-ai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cubic analysis

2 issues found across 9 files

Confidence score: 2/5

  • There is a clear regression risk in exporter/awss3exporter/json_batching_marshaler.go: trace batching emits resourceSpans fragments without the OTLP JSON envelope, which can produce invalid OTLP JSON output for downstream consumers.
  • exporter/awss3exporter/exporter.go schedules timer-based flushes in local time rather than the configured S3 partition timezone, so data can land in misaligned partition windows and affect partitioned ingestion/query behavior.
  • Given two medium-high severity, high-confidence behavioral issues that are user-facing, this is higher merge risk until addressed.
  • Pay close attention to exporter/awss3exporter/json_batching_marshaler.go and exporter/awss3exporter/exporter.go - invalid JSON batching and timezone-misaligned flush timing can both impact data correctness.
Prompt for AI agents (unresolved issues)

Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.


<file name="exporter/awss3exporter/exporter.go">

<violation number="1" location="exporter/awss3exporter/exporter.go:161">
P2: Timer-based flush is scheduled in local time instead of the configured S3 partition timezone, so flush windows can be misaligned with S3 partition windows.</violation>
</file>

<file name="exporter/awss3exporter/json_batching_marshaler.go">

<violation number="1" location="exporter/awss3exporter/json_batching_marshaler.go:133">
P1: Trace batching drops the OTLP JSON envelope and emits `resourceSpans` fragments instead of valid OTLP JSON documents.</violation>
</file>

Linked issue analysis

Linked issue: SAW-7101: Insights stopped working with new collector version

Status Acceptance criteria Notes
Signal type in S3 path so sampled data routes to the expected S3 folder (Gap 1) Appended Metadata into S3 path builder to include signal type
Timer-based batching that flushes at minute/hour boundaries (Gap 2) Added runEvery that triggers flushes on minute/hour boundaries
jsonBatchingMarshaler for JSONL batching with configurable max_file_size_bytes (Gap 3) Added JSON batching marshaler and config field to control max size
Flush interfaces (logFlusher / traceFlusher) for marshalers and encoding extensions (Gap 4) Introduced flusher interfaces and wired them into marshalers
⚠️ Legacy behavior gated behind legacy config so standard contrib paths unaffected No explicit legacy config flag or clear conditional seen in diffs
Shutdown handling to stop timers and flush marshaler on shutdown Added shutdown(), closed done channel and registered WithShutdown
Tests updated to reflect new S3 path behavior (and local MinIO path verification) Unit tests updated to expect new path layout with service/noop prefix
Config exposes MaxFileSizeBytes for json batching Config struct now includes MaxFileSizeBytes field
Flushed marshaler buffers are uploaded to S3 when non-empty flushMarshaler uploads non-empty buffers via uploadBuffer

Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review, or fix all with cubic.

Comment thread exporter/awss3exporter/json_batching_marshaler.go Outdated
Comment thread exporter/awss3exporter/exporter.go

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (9)
exporter/awss3exporter/internal/upload/partition_test.go (1)

96-96: Expectations match the new bucketKeyPrefix behavior.

Test coverage exercises the new Metadata segment across the meaningful matrix: compression on/off, overridePrefix set/empty, PartitionBasePrefix alone and combined with PartitionPrefix. No gaps apparent.

Optional: consider adding one case where Metadata is explicitly empty but other fields are populated, to lock in the "Metadata-not-set → no extra segment" backward-compatibility contract at the Build() level (currently only covered indirectly via TestPartitionKeyInputsBucketPrefix).

Also applies to: 112-112, 128-128, 143-143, 159-159, 175-175, 191-191

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@exporter/awss3exporter/internal/upload/partition_test.go` at line 96, Add a
test table case in partition_test.go that sets Metadata to an explicit empty
value while populating other fields (e.g., overridePrefix, PartitionBasePrefix,
PartitionPrefix, compression flag) and assert the resulting
bucketKeyPrefix/expect path does NOT include a Metadata segment; locate the
table used by the existing partition key tests and add this row so Build() is
exercised with Metadata == "" to lock in the backward-compatibility contract.
exporter/awss3exporter/exporter.go (3)

121-123: Error from newMarshalerWithConfig is discarded.

newMarshalerWithConfig forwards errors from newMarshaler (e.g., ErrUnknownMarshaler), but you rewrite it to unknown marshaler %q — losing the underlying cause. Wrap instead.

♻️ Proposed fix
-	} else if e.config.MaxFileSizeBytes > 0 {
-		if m, err = newMarshalerWithConfig(e.config.MarshalerName, e.config.MaxFileSizeBytes, e.logger); err != nil {
-			return fmt.Errorf("unknown marshaler %q", e.config.MarshalerName)
-		}
+	} else if e.config.MaxFileSizeBytes > 0 {
+		if m, err = newMarshalerWithConfig(e.config.MarshalerName, e.config.MaxFileSizeBytes, e.logger); err != nil {
+			return fmt.Errorf("unknown marshaler %q: %w", e.config.MarshalerName, err)
+		}

(The pre-existing else branch at line 126 has the same issue and could be updated consistently.)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@exporter/awss3exporter/exporter.go` around lines 121 - 123, The current
return when calling newMarshalerWithConfig discards the original error (e.g.,
ErrUnknownMarshaler); change the error handling to wrap the underlying error
instead of replacing it so callers can inspect the cause — for example, return
fmt.Errorf("unknown marshaler %q: %w", e.config.MarshalerName, err) (or use
errors.Wrap) when newMarshalerWithConfig returns an error, and apply the same
wrapping fix to the other branch that rewrites errors (the else branch that
handles marshaler creation).

212-215: shutdown is not idempotent — a second invocation panics on close(e.done).

The collector normally calls Shutdown only once, but defense-in-depth is cheap and avoids hard-to-diagnose panics if the hook is invoked twice (e.g., in tests or wrappers).

♻️ Optional hardening
 type s3Exporter struct {
 	...
 	done       chan struct{}
+	doneOnce   sync.Once
 }

 func (e *s3Exporter) shutdown(ctx context.Context) error {
-	close(e.done)
+	e.doneOnce.Do(func() { close(e.done) })
 	return e.flushMarshaler(ctx, "shutdown")
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@exporter/awss3exporter/exporter.go` around lines 212 - 215, shutdown
currently closes the done channel unconditionally which panics if called twice;
make closing idempotent by protecting the close with a sync.Once (add a field
like closeOnce sync.Once to s3Exporter) and replace close(e.done) with
closeOnce.Do(func(){ close(e.done) }), then keep the existing call to
e.flushMarshaler(ctx, "shutdown") so shutdown remains functional but safe on
repeated invocations.

137-179: Timer-based flushing uses Start context — fragile pattern that could fail with future collector versions.

The ctx passed to Start(ctx, host) is intended for startup operations. While current OTel collector implementations keep this context alive until shutdown, the behavior is not guaranteed — future versions may add startup timeouts. Here, that same ctx is passed as the parent to sleepCtx (line 163) and to flushMarshaler (line 146), coupling the long-running periodic flush to the startup phase context.

Use context.Background() as the parent for timer-based operations and rely on e.done for graceful termination:

  1. Remove the ctx parameter from runEvery signature and pass context.Background() to both sleepCtx and task().
  2. Replace context.WithTimeout(ctx, ...) + select on sleepCtx.Done() with time.NewTimer(...) — simpler and avoids context layering.
  3. Optional: Guard close(e.done) in Shutdown with sync.Once to prevent panic on double-invocation.
Example refactor
 	// Launch timer-based flushing when the marshaler supports flushing and a
 	// partition cadence is configured.
 	if e.config.S3Uploader.S3Partition != "" {
 		if _, ok := m.(logFlusher); ok {
-			go e.runEvery(ctx, e.config.S3Uploader.S3Partition, func() {
-				if err := e.flushMarshaler(ctx, "timer"); err != nil {
+			go e.runEvery(e.config.S3Uploader.S3Partition, func(tctx context.Context) {
+				if err := e.flushMarshaler(tctx, "timer"); err != nil {
 					e.logger.Error("Failed to flush S3 exporter", zap.Error(err))
 				}
 			})
 		}
 	}
 	return nil
 }

-func (e *s3Exporter) runEvery(ctx context.Context, s3Partition string, task func()) {
+func (e *s3Exporter) runEvery(s3Partition string, task func(context.Context)) {
 	var getNextTick func(time.Time) time.Time
 	switch s3Partition {
 	case "minute":
 		getNextTick = func(t time.Time) time.Time { return t.Truncate(time.Minute).Add(time.Minute) }
 	default:
 		getNextTick = func(t time.Time) time.Time { return t.Truncate(time.Hour).Add(time.Hour) }
 	}

 	next := getNextTick(time.Now())
 	for {
-		sleepCtx, cancel := context.WithTimeout(ctx, time.Until(next))
-		select {
-		case <-sleepCtx.Done():
-			if sleepCtx.Err() == context.DeadlineExceeded {
-				cancel()
-				task()
-			} else {
-				cancel()
-				return
-			}
-		case <-e.done:
-			cancel()
+		timer := time.NewTimer(time.Until(next))
+		select {
+		case <-timer.C:
+			task(context.Background())
+		case <-e.done:
+			timer.Stop()
 			return
 		}
 		next = getNextTick(time.Now())
 	}
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@exporter/awss3exporter/exporter.go` around lines 137 - 179, The periodic
flush ties long-lived work to the Start(ctx, host) startup context; remove that
fragility by changing runEvery to not accept the startup ctx (remove the ctx
parameter on runEvery and the goroutine call), use context.Background() as the
parent for any background operations invoked by runEvery (invoke task() directly
or with context.Background() if it needs a ctx), replace the
context.WithTimeout(...) + select logic in runEvery with
time.NewTimer(time.Until(next)) and a select on timer.C and e.done (stop/reset
the timer appropriately), and make Shutdown guard close(e.done) with sync.Once
to avoid double-close panics; references: runEvery, flushMarshaler (caller),
Start (goroutine launch), e.done, and Shutdown.
exporter/awss3exporter/config.go (1)

117-121: Add validation for MaxFileSizeBytes and document precedence.

Two small gaps worth addressing:

  1. Validate() does not reject negative values for MaxFileSizeBytes. A negative value is silently treated as "no batching" (since the gate is > 0), hiding a likely misconfiguration.
  2. The field doc says batching is used when MarshalerName == OtlpJSON, but in exporter/awss3exporter/exporter.go start() the batching path is only taken when Encoding == nil. When a user sets both encoding and max_file_size_bytes, batching is silently ignored. Worth noting here.
♻️ Proposed doc/validation tweak
 	// MaxFileSizeBytes sets the uncompressed size threshold at which the
 	// jsonBatchingMarshaler flushes its buffer to S3. When > 0 and
-	// MarshalerName is OtlpJSON, the batching marshaler is used instead
-	// of the default pass-through marshaler.
+	// MarshalerName is OtlpJSON (and Encoding is unset), the batching
+	// marshaler is used instead of the default pass-through marshaler.
+	// Must be >= 0.
 	MaxFileSizeBytes int `mapstructure:"max_file_size_bytes"`

And in Validate():

+	if c.MaxFileSizeBytes < 0 {
+		errs = multierr.Append(errs, errors.New("max_file_size_bytes must be non-negative"))
+	}

Also applies to: 139-207

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@exporter/awss3exporter/config.go` around lines 117 - 121, Validate that
MaxFileSizeBytes is non-negative in the config's Validate() (reject values < 0
with a clear error referencing MaxFileSizeBytes) and update the field
documentation to explicitly state precedence: batching (jsonBatchingMarshaler)
only applies when MarshalerName == OtlpJSON AND Encoding == nil (i.e., an
explicit Encoding setting overrides/max_file_size_bytes is ignored). Mention
these symbols in the message: MaxFileSizeBytes, MarshalerName, Encoding,
Validate(), and the start() batching branch in
exporter/awss3exporter/exporter.go so reviewers can find and align validation
and docs with the runtime check.
exporter/awss3exporter/json_batching_marshaler.go (3)

20-36: Shared uncompressedSize counter between logs and traces is fragile.

uncompressedSize is a single field used by both MarshalLogs/flushLogBatch and MarshalTraces/flushTraceBatch. flushLogBatch resets the counter to 0 — if a future refactor (or an accidental wiring) ever routes both logs and traces through the same marshaler instance, log flushes will zero out the trace accounting and vice versa, producing incorrect thresholds.

Today the factory creates one s3Exporter (and thus one marshaler) per signal, so this is latent. Consider splitting into logsSize and tracesSize for defense against future regressions.

♻️ Suggested split
-	uncompressedSize int
-	maxBatchSize     int
+	logsUncompressedSize   int
+	tracesUncompressedSize int
+	maxBatchSize           int

and use each in its corresponding path.

Also applies to: 87-105, 150-168

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@exporter/awss3exporter/json_batching_marshaler.go` around lines 20 - 36, The
shared uncompressedSize field is used by both log and trace paths and must be
split to avoid cross-signal interference: add two separate counters (e.g.,
logsUncompressedSize and tracesUncompressedSize) to the same struct where
uncompressedSize currently lives, update MarshalLogs and flushLogBatch to
increment/reset only logsUncompressedSize, update MarshalTraces and
flushTraceBatch to increment/reset only tracesUncompressedSize, ensure all
accesses remain protected by the existing mutex, and initialize the new fields
where the exporter is created.

51-69: Asymmetry between logs and traces batching granularity.

Logs are batched at per-MarshalLogs-call granularity (the entire plog.Logs becomes one JSONL line regardless of how many ResourceLogs it contains), while traces are split per-ResourceSpans element. Besides being inconsistent, long-tail log batches can produce very large JSONL lines that are harder to consume downstream. Consider splitting logs per ResourceLogs as well (see the same pdata-iteration approach suggested for traces).

Also applies to: 113-148

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@exporter/awss3exporter/json_batching_marshaler.go` around lines 51 - 69, The
current MarshalLogs in jsonBatchingMarshaler treats the entire plog.Logs as one
JSONL entry causing oversized lines; change MarshalLogs (and the similar block
at lines 113-148) to iterate over ld.ResourceLogs() and for each ResourceLogs
call m.logsMarshaler.MarshalLogs for that single ResourceLogs (or marshal the
ResourceLogs element directly), append each resulting json.RawMessage to
m.logBatches, increment m.uncompressedSize and m.logsBatchCount per
ResourceLogs, and after adding each element check if m.uncompressedSize >=
m.maxBatchSize and call m.flushLogBatch() as needed; keep the mutex locking
around these operations and ensure the method returns either flushed bytes or
empty slice consistent with flushLogBatch behavior.

122-122: Use any instead of interface{} for Go 1.18+ consistency.

This refactoring aligns with the Go 1.18+ idiom and the prevailing style in the AWS S3 exporter package, which already uses any in other files.

-	var otlpData map[string]interface{}
+	var otlpData map[string]any

Also on line 127: []interface{}[]any.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@exporter/awss3exporter/json_batching_marshaler.go` at line 122, Replace uses
of the empty interface with Go 1.18+ alias `any` for consistency: change the
declaration of otlpData from `map[string]interface{}` to `map[string]any` and
change any occurrences of `[]interface{}` (notably the slice on line 127) to
`[]any`; update any related type assertions or casts in the surrounding function
(the variable `otlpData` and the slice usage) to use `any` accordingly so the
code compiles with the new type alias.
exporter/awss3exporter/marshaler.go (1)

56-61: Silent fallback when max_file_size_bytes is set with a non-JSON marshaler.

If a user configures max_file_size_bytes > 0 but MarshalerName is anything other than OtlpJSON (e.g., otlp_proto, sumo_ic, body), batching is silently disabled and the user gets the default pass-through behavior without any indication. Consider either logging a warning here or rejecting the combination in Config.Validate() so the misconfiguration is visible.

♻️ Optional log-warning suggestion
 func newMarshalerWithConfig(mType MarshalerType, maxBatchSize int, logger *zap.Logger) (marshaler, error) {
 	if mType == OtlpJSON && maxBatchSize > 0 {
 		return newJSONBatchingMarshaler(maxBatchSize, logger), nil
 	}
+	if maxBatchSize > 0 && logger != nil {
+		logger.Warn("max_file_size_bytes is only honored when marshaler is otlp_json; batching is disabled",
+			zap.String("marshaler", string(mType)))
+	}
 	return newMarshaler(mType, logger)
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@exporter/awss3exporter/marshaler.go` around lines 56 - 61, The current
newMarshalerWithConfig(mType MarshalerType, maxBatchSize int, logger
*zap.Logger) silently disables batching when maxBatchSize>0 but mType !=
OtlpJSON; either add an explicit warning inside newMarshalerWithConfig (use
logger.Warn) when maxBatchSize>0 and mType != OtlpJSON stating that batching is
only supported for OtlpJSON and you're falling back to newMarshaler, or
(preferable) enforce this constraint earlier by adding a validation check in
Config.Validate() that returns an error if Config.MaxFileSizeBytes>0 and
Config.MarshalerName != "OtlpJSON" so the misconfiguration is surfaced to the
user; reference newJSONBatchingMarshaler and newMarshaler when implementing the
warning/validation message.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@exporter/awss3exporter/json_batching_marshaler.go`:
- Around line 1-186: Add unit tests covering jsonBatchingMarshaler behavior:
write tests that call MarshalLogs and MarshalTraces to verify they buffer and
return empty when uncompressedSize < maxBatchSize and that they return JSONL
when the threshold is crossed (exercise jsonBatchingMarshaler.MarshalLogs,
MarshalTraces, and createJSONL indirectly); test FlushLogs and FlushTraces force
a flush and reset internal state (logsBatchCount/tracesBatchCount and
uncompressedSize); add a concurrency test that runs MarshalLogs concurrently
with FlushLogs under -race to ensure no races; and add integration-style tests
for exporter runEvery/flushMarshaler/shutdown in exporter.go to verify shutdown
triggers a final flush and stops the periodic ticker. Use the concrete types and
methods named jsonBatchingMarshaler, MarshalLogs, MarshalTraces, FlushLogs,
FlushTraces, runEvery, flushMarshaler, and shutdown to locate and exercise the
logic.
- Around line 113-148: The MarshalTraces method in jsonBatchingMarshaler
currently does an expensive map-based JSON round-trip via tracesMarshaler
output; change it to iterate the incoming ptrace.Traces via td.ResourceSpans()
(use the pdata API) and marshal each resource-spans entry directly into the
traceBatches slice, updating traceBatches, tracesBatchCount, and
uncompressedSize, and invoking flushTraceBatch() when uncompressedSize >=
maxBatchSize; remove the json.Unmarshal/map[string]interface{} and the
re-marshaling of whole output to avoid allocations and numeric coercion.

In `@exporter/awss3exporter/s3_marshaler.go`:
- Around line 69-81: The s3Marshaler methods FlushLogs/FlushTraces make every
s3Marshaler satisfy logFlusher/traceFlusher and break the feature-detection in
exporter.go; either remove these methods from s3Marshaler and implement them
only on the batching marshaler and the extension-wrapping type that actually
buffer data, or keep the methods but change exporter.go to gate on
marshaler.extLogFlusher != nil / marshaler.extTraceFlusher (or a new
hasBufferedData interface implemented only by jsonBatchingMarshaler and the
extension-backed s3Marshaler) before launching runEvery/flushMarshaler; update
references to s3Marshaler, FlushLogs, FlushTraces, extLogFlusher,
extTraceFlusher, and the type assertions m.(logFlusher)/m.(traceFlusher)
accordingly.

---

Nitpick comments:
In `@exporter/awss3exporter/config.go`:
- Around line 117-121: Validate that MaxFileSizeBytes is non-negative in the
config's Validate() (reject values < 0 with a clear error referencing
MaxFileSizeBytes) and update the field documentation to explicitly state
precedence: batching (jsonBatchingMarshaler) only applies when MarshalerName ==
OtlpJSON AND Encoding == nil (i.e., an explicit Encoding setting
overrides/max_file_size_bytes is ignored). Mention these symbols in the message:
MaxFileSizeBytes, MarshalerName, Encoding, Validate(), and the start() batching
branch in exporter/awss3exporter/exporter.go so reviewers can find and align
validation and docs with the runtime check.

In `@exporter/awss3exporter/exporter.go`:
- Around line 121-123: The current return when calling newMarshalerWithConfig
discards the original error (e.g., ErrUnknownMarshaler); change the error
handling to wrap the underlying error instead of replacing it so callers can
inspect the cause — for example, return fmt.Errorf("unknown marshaler %q: %w",
e.config.MarshalerName, err) (or use errors.Wrap) when newMarshalerWithConfig
returns an error, and apply the same wrapping fix to the other branch that
rewrites errors (the else branch that handles marshaler creation).
- Around line 212-215: shutdown currently closes the done channel
unconditionally which panics if called twice; make closing idempotent by
protecting the close with a sync.Once (add a field like closeOnce sync.Once to
s3Exporter) and replace close(e.done) with closeOnce.Do(func(){ close(e.done)
}), then keep the existing call to e.flushMarshaler(ctx, "shutdown") so shutdown
remains functional but safe on repeated invocations.
- Around line 137-179: The periodic flush ties long-lived work to the Start(ctx,
host) startup context; remove that fragility by changing runEvery to not accept
the startup ctx (remove the ctx parameter on runEvery and the goroutine call),
use context.Background() as the parent for any background operations invoked by
runEvery (invoke task() directly or with context.Background() if it needs a
ctx), replace the context.WithTimeout(...) + select logic in runEvery with
time.NewTimer(time.Until(next)) and a select on timer.C and e.done (stop/reset
the timer appropriately), and make Shutdown guard close(e.done) with sync.Once
to avoid double-close panics; references: runEvery, flushMarshaler (caller),
Start (goroutine launch), e.done, and Shutdown.

In `@exporter/awss3exporter/internal/upload/partition_test.go`:
- Line 96: Add a test table case in partition_test.go that sets Metadata to an
explicit empty value while populating other fields (e.g., overridePrefix,
PartitionBasePrefix, PartitionPrefix, compression flag) and assert the resulting
bucketKeyPrefix/expect path does NOT include a Metadata segment; locate the
table used by the existing partition key tests and add this row so Build() is
exercised with Metadata == "" to lock in the backward-compatibility contract.

In `@exporter/awss3exporter/json_batching_marshaler.go`:
- Around line 20-36: The shared uncompressedSize field is used by both log and
trace paths and must be split to avoid cross-signal interference: add two
separate counters (e.g., logsUncompressedSize and tracesUncompressedSize) to the
same struct where uncompressedSize currently lives, update MarshalLogs and
flushLogBatch to increment/reset only logsUncompressedSize, update MarshalTraces
and flushTraceBatch to increment/reset only tracesUncompressedSize, ensure all
accesses remain protected by the existing mutex, and initialize the new fields
where the exporter is created.
- Around line 51-69: The current MarshalLogs in jsonBatchingMarshaler treats the
entire plog.Logs as one JSONL entry causing oversized lines; change MarshalLogs
(and the similar block at lines 113-148) to iterate over ld.ResourceLogs() and
for each ResourceLogs call m.logsMarshaler.MarshalLogs for that single
ResourceLogs (or marshal the ResourceLogs element directly), append each
resulting json.RawMessage to m.logBatches, increment m.uncompressedSize and
m.logsBatchCount per ResourceLogs, and after adding each element check if
m.uncompressedSize >= m.maxBatchSize and call m.flushLogBatch() as needed; keep
the mutex locking around these operations and ensure the method returns either
flushed bytes or empty slice consistent with flushLogBatch behavior.
- Line 122: Replace uses of the empty interface with Go 1.18+ alias `any` for
consistency: change the declaration of otlpData from `map[string]interface{}` to
`map[string]any` and change any occurrences of `[]interface{}` (notably the
slice on line 127) to `[]any`; update any related type assertions or casts in
the surrounding function (the variable `otlpData` and the slice usage) to use
`any` accordingly so the code compiles with the new type alias.

In `@exporter/awss3exporter/marshaler.go`:
- Around line 56-61: The current newMarshalerWithConfig(mType MarshalerType,
maxBatchSize int, logger *zap.Logger) silently disables batching when
maxBatchSize>0 but mType != OtlpJSON; either add an explicit warning inside
newMarshalerWithConfig (use logger.Warn) when maxBatchSize>0 and mType !=
OtlpJSON stating that batching is only supported for OtlpJSON and you're falling
back to newMarshaler, or (preferable) enforce this constraint earlier by adding
a validation check in Config.Validate() that returns an error if
Config.MaxFileSizeBytes>0 and Config.MarshalerName != "OtlpJSON" so the
misconfiguration is surfaced to the user; reference newJSONBatchingMarshaler and
newMarshaler when implementing the warning/validation message.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 50facedc-710d-488a-a705-d8adb05ae942

📥 Commits

Reviewing files that changed from the base of the PR and between 4125f03 and a16d480.

📒 Files selected for processing (9)
  • exporter/awss3exporter/config.go
  • exporter/awss3exporter/exporter.go
  • exporter/awss3exporter/factory.go
  • exporter/awss3exporter/internal/upload/partition.go
  • exporter/awss3exporter/internal/upload/partition_test.go
  • exporter/awss3exporter/internal/upload/writer_test.go
  • exporter/awss3exporter/json_batching_marshaler.go
  • exporter/awss3exporter/marshaler.go
  • exporter/awss3exporter/s3_marshaler.go

Comment thread exporter/awss3exporter/json_batching_marshaler.go
Comment thread exporter/awss3exporter/json_batching_marshaler.go
Comment thread exporter/awss3exporter/s3_marshaler.go
- Fix timer boundary skip: advance from previous boundary, not time.Now()
- Fix timer timezone: use configured S3PartitionTimezone (default UTC)
- Fix trace batching: store full OTLP JSON documents, not fragments
- Remove expensive JSON round-trip in MarshalTraces
- Add comments clarifying nil upload opts for timer flushes
- Add json_batching_marshaler_test.go with 6 tests

Refs: SAW-7101

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

@sawmills-staging sawmills-staging Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Compared with previous telemetry review: 0 new, 1 still open, 0 fixed, 0 accepted.

Advisory: [LOW] flushLogBatch() and flushTraceBatch() each emit a Debug log on every invocation. Both helpers are called from two paths: (1) inline inside MarshalLogs/MarshalTraces whenever uncompressedSize >= maxBatchSize — firing on every size-threshold crossing during hot-path ingestion — and (2) from the timer goroutine (runEvery → flushMarshaler → FlushLogs/FlushTraces) once per minute or hour. The fields logged (batch_count, uncompressed_size) duplicate information already observable through the rich set of OTel metrics introduced in the same change (otelcol_exporter_awss3_flush_complete_total, otelcol_exporter_awss3_upload_bytes, otelcol_exporter_awss3_upload_object_size). When an operator enables debug logging to diagnose a real issue, these two calls become the dominant log stream, masking more actionable signals. The org pipeline routes logs to log-indexing destinations (datadog, otlpHttp) in addition to the awsS3 destination, so any debug-level log drain that reaches those indexing destinations would also inflate ingestion volume.
Action: Remove both Debug calls from the internal flushLogBatch() and flushTraceBatch() helpers. If flush confirmation is needed for debugging, emit a single Debug log at the FlushLogs()/FlushTraces() public entry points (timer and shutdown paths only), not inside the internal helpers that are also called on every size-threshold crossing. The existing flush/upload metrics (flush_complete_total, upload_bytes, upload_object_size) already provide the same batch_count and size observability without per-flush log noise.
Location: exporter/awss3exporter/json_batching_marshaler.go:100.

@cubic-dev-ai cubic-dev-ai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

0 issues found across 3 files (changes from recent commits).

Requires human review: This PR introduces significant new logic for timer-based batching and custom JSONL marshaling, including background goroutines and path structure changes that require human review.

@sawmills-architect-review sawmills-architect-review Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[ARCH-REVIEW] REQUEST_CHANGES

Two blocking regressions remain: timer/shutdown flushes drop resource-based S3 routing, and the new timer schedule defaults to UTC even though partition keys default to local time.

Comment thread exporter/awss3exporter/exporter.go
Comment thread exporter/awss3exporter/exporter.go Outdated
…nKeyBuilder

The PartitionKeyBuilder defaults PartitionTimeLocation to time.Local when
unset. The timer must use the same default so flush boundaries align with
S3 partition keys.

Refs: SAW-7101

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

@cubic-dev-ai cubic-dev-ai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

0 issues found across 1 file (changes from recent commits).

Requires human review: This PR introduces significant new logic for batching, timer-based flushing, and S3 path construction. These are critical paths that impact data integrity and downstream consumers.

@amir-jakoby amir-jakoby left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[ARCH-REVIEW] Re-review: REQUEST_CHANGES

The timer timezone fix is good. One blocking regression remains in the batching path: buffered flushes still drop resource-scoped S3 routing, so mixed-resource data can land under the wrong bucket/prefix.

Comment thread exporter/awss3exporter/json_batching_marshaler.go
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

@cubic-dev-ai cubic-dev-ai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

0 issues found across 1 file (changes from recent commits).

Requires human review: Significant logic changes including new batching mechanisms, background timer-based flushing, and S3 path structure modifications. Requires human review for concurrency and data integrity.

@sawmills-architect-review sawmills-architect-review Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[ARCH-REVIEW] REQUEST_CHANGES

The prior blocking routing bug is still present. Buffered flushes still lose per-resource UploadOptions, so mixed-resource batches can land under the wrong bucket/prefix, and timer/shutdown flushes still have no route at all.
1 unresolved review thread remains.

Comment thread exporter/awss3exporter/json_batching_marshaler.go
The jsonBatchingMarshaler does not track per-resource upload options.
Resource-based S3 routing is handled by the batchperresourceattr wrapper
in factory.go which splits by resource attribute before the exporter.
This matches the old sawmills-collector fork behavior.

Refs: SAW-7101

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

@sawmills-architect-review sawmills-architect-review Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[ARCH-REVIEW] REQUEST_CHANGES

The routing regression is still open. The new comment claims batchperresourceattr gives each exporter a single resource prefix, but the exporter still buffers across multiple ConsumeLogs calls and flushMarshaler uploads buffered bytes with nil uploadOpts. Mixed-resource batches can still land under the wrong prefix on timer/shutdown flushes.

Comment thread exporter/awss3exporter/json_batching_marshaler.go Outdated

@cubic-dev-ai cubic-dev-ai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

0 issues found across 1 file (changes from recent commits).

Requires human review: Significant logic changes including a new batching marshaler, background timer-based flushing, and modifications to S3 path partitioning. High impact on data delivery.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (2)
exporter/awss3exporter/exporter.go (1)

140-159: Timer startup only checks logFlusher — traces-only flushers will be skipped.

The goroutine that drives periodic flushes is gated solely on m.(logFlusher). Today this is fine because jsonBatchingMarshaler satisfies both logFlusher and traceFlusher, so a traces-pipeline exporter still passes this check. However, if an encoding extension (or a future marshaler) exposes only traceFlusher, periodic flushing will silently not start for a traces-only pipeline, and data will only ever flush on shutdown or when MaxFileSizeBytes is crossed. Start the ticker if either interface is satisfied:

♻️ Suggested fix
-	if e.config.S3Uploader.S3Partition != "" {
-		if _, ok := m.(logFlusher); ok {
+	if e.config.S3Uploader.S3Partition != "" {
+		_, hasLogFlusher := m.(logFlusher)
+		_, hasTraceFlusher := m.(traceFlusher)
+		if hasLogFlusher || hasTraceFlusher {
 			var loc *time.Location
 			...
 		}
 	}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@exporter/awss3exporter/exporter.go` around lines 140 - 159, The periodic
flush goroutine is only started when the marshaler implements logFlusher, which
will skip traces-only flushers; modify the check in the S3 partition timer
startup so it starts the ticker if the marshaler implements either logFlusher or
traceFlusher (i.e., replace the single type assertion on m.(logFlusher) with a
check that tests for logFlusher OR traceFlusher), then keep the existing time
zone logic and the call to go e.runEvery(..., func() { if err :=
e.flushMarshaler(ctx, "timer"); ... }) unchanged so traces-only marshalers will
also trigger periodic flushes.
exporter/awss3exporter/json_batching_marshaler.go (1)

25-55: Consider documenting / separating the shared uncompressedSize.

uncompressedSize is mutated by both the logs path (Line 68) and the traces path (Line 132), and either flush resets it (Lines 108, 156). In today's wiring each jsonBatchingMarshaler is constructed per signal type, so a single instance only ever buffers logs or traces, which keeps this correct. But the struct itself doesn't enforce that invariant — if this marshaler is ever reused for both signals in the same exporter (e.g., a future multi-signal pipeline), the shared counter would cause premature/missed auto-flushes. Either split into logsUncompressedSize / tracesUncompressedSize or add a comment calling out the "single-signal-per-instance" invariant.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@exporter/awss3exporter/json_batching_marshaler.go` around lines 25 - 55, The
struct-level uncompressedSize is shared between logs and traces and can cause
cross-signal flush bugs; split it into two distinct counters
logsUncompressedSize and tracesUncompressedSize on jsonBatchingMarshaler and
update all places that currently read/write uncompressedSize (the logs path and
traces path methods that increment the counter and the flush methods that reset
it) to use the appropriate new field, initialize them in
newJSONBatchingMarshaler, and ensure flush logic for logs only resets
logsUncompressedSize while trace flushes reset tracesUncompressedSize;
alternatively, if you prefer not to split, add a clear comment on
jsonBatchingMarshaler documenting the single-instance-per-signal invariant and
assert that invariant where the marshaler is created.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@exporter/awss3exporter/exporter.go`:
- Around line 120-123: The current branch calling
newMarshalerWithConfig(e.config.MarshalerName, e.config.MaxFileSizeBytes,
e.logger) discards the original err and returns a generic fmt.Errorf("unknown
marshaler %q", e.config.MarshalerName); change this to return a wrapped error
that includes the original err (e.g., fmt.Errorf("failed to create marshaler %q:
%w", e.config.MarshalerName, err)) so callers see the real failure (covers
issues beyond an unknown name such as invalid MaxFileSizeBytes or logger
problems); update the error return where newMarshalerWithConfig is invoked so
the original error is preserved and surfaced.
- Around line 233-236: shutdown currently closes e.done unguarded which will
panic if called twice; change s3Exporter to use a sync.Once (e.g., add a field
like shutdownOnce) and call shutdownOnce.Do(func() { close(e.done) }) inside
s3Exporter.shutdown, then still call and return e.flushMarshaler(ctx,
"shutdown") so shutdown remains idempotent; while here also audit flushMarshaler
and uploadBuffer to ensure they accept and honor the provided ctx for
cancellation/deadline during shutdown, updating uploadBuffer to abort/return
promptly on ctx.Done() if it does not already.

---

Nitpick comments:
In `@exporter/awss3exporter/exporter.go`:
- Around line 140-159: The periodic flush goroutine is only started when the
marshaler implements logFlusher, which will skip traces-only flushers; modify
the check in the S3 partition timer startup so it starts the ticker if the
marshaler implements either logFlusher or traceFlusher (i.e., replace the single
type assertion on m.(logFlusher) with a check that tests for logFlusher OR
traceFlusher), then keep the existing time zone logic and the call to go
e.runEvery(..., func() { if err := e.flushMarshaler(ctx, "timer"); ... })
unchanged so traces-only marshalers will also trigger periodic flushes.

In `@exporter/awss3exporter/json_batching_marshaler.go`:
- Around line 25-55: The struct-level uncompressedSize is shared between logs
and traces and can cause cross-signal flush bugs; split it into two distinct
counters logsUncompressedSize and tracesUncompressedSize on
jsonBatchingMarshaler and update all places that currently read/write
uncompressedSize (the logs path and traces path methods that increment the
counter and the flush methods that reset it) to use the appropriate new field,
initialize them in newJSONBatchingMarshaler, and ensure flush logic for logs
only resets logsUncompressedSize while trace flushes reset
tracesUncompressedSize; alternatively, if you prefer not to split, add a clear
comment on jsonBatchingMarshaler documenting the single-instance-per-signal
invariant and assert that invariant where the marshaler is created.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 43e5d953-27a4-4975-a0b3-0222e1bb550e

📥 Commits

Reviewing files that changed from the base of the PR and between a16d480 and 2a7c8ef.

📒 Files selected for processing (3)
  • exporter/awss3exporter/exporter.go
  • exporter/awss3exporter/json_batching_marshaler.go
  • exporter/awss3exporter/json_batching_marshaler_test.go

Comment thread exporter/awss3exporter/exporter.go
Comment thread exporter/awss3exporter/exporter.go
- Fix misleading comment on jsonBatchingMarshaler: document resource
  routing as a known limitation matching old fork, not as handled
- Propagate actual error from newMarshalerWithConfig instead of generic msg
- Guard shutdown against double-close panic with sync.Once

Refs: SAW-7101

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

@cubic-dev-ai cubic-dev-ai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

0 issues found across 2 files (changes from recent commits).

Requires human review: This PR introduces significant new logic for batching, timer-based flushing, and modifies the S3 path structure. These changes to the core data path require human review.

@sawmills-architect-review sawmills-architect-review Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[ARCH-REVIEW] Re-review: REQUEST_CHANGES

The error handling and shutdown fixes look good. One blocking regression remains: timer/shutdown flushes still upload buffered data with nil UploadOptions, so any resource_attrs_to_s3 routing falls back to the default bucket/prefix. That can misroute batches once the periodic flush or shutdown path runs.

Comment thread exporter/awss3exporter/exporter.go

@sawmills-staging sawmills-staging Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Compared with previous telemetry review: 1 new, 0 still open, 1 fixed, 0 accepted.

Advisory: [MEDIUM] flushMarshaler (called by both the timer and shutdown paths) calls uploadBuffer — which correctly records upload_start/upload_complete — but never calls recordFlushStart or recordFlushComplete. The result is a systematic routing error: otelcol_exporter_awss3_flush_start_total, otelcol_exporter_awss3_flush_complete_total, and otelcol_exporter_awss3_flush_duration only receive increments from ConsumeLogs (the per-record path), which returns empty bytes and skips uploadBuffer whenever the batch is below maxBatchSize — i.e., on the vast majority of calls. The actual partition-boundary flushes (timer, once per minute or hour) and the graceful-shutdown flush, which are the only paths that produce real S3 objects in the batching marshaler design, contribute zero observations to those three metrics. Any dashboard, alert, or SLO built on flush_complete_total or flush_duration will reflect per-record no-op calls rather than true flush events, making flush success rate and flush latency signals unreliable.
Action: Add recordFlushStart/recordFlushComplete calls inside flushMarshaler, bracketing each FlushLogs/FlushTraces call, mirroring the pattern in ConsumeLogs. Capture a flushStartedAt timestamp before each FlushLogs/FlushTraces call and pass the elapsed duration and error to recordFlushComplete after the call. Pass the reason string ("timer" or "shutdown") as the reason argument so the flush_complete_total and flush_duration metrics carry the same label cardinality as the ConsumeLogs path. Example for the log branch: flushStartedAt := time.Now() e.telemetry.recordFlushStart(ctx, e.signalType) buf, err := flusher.FlushLogs() e.telemetry.recordFlushComplete(ctx, e.signalType, reason, time.Since(flushStartedAt), err) Apply the same pattern to the trace branch.
Location: exporter/awss3exporter/exporter.go:200.

@sawmills-architect-review sawmills-architect-review Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[ARCH-REVIEW] REQUEST_CHANGES

The new flush path is functionally incomplete for routed exports. Timer/shutdown flushes always upload with nil upload options, so any batch written by the new marshaler bypasses resource_attrs_to_s3 and falls back to the default bucket/prefix. That means low-volume streams can land in the wrong S3 location and be co-mingled across resources.

The thread-resolved state is clean, but this routing regression still needs a fix before merge.

// prefix because batching marshalers accumulate data without per-resource
// context. Resource-based routing only applies to per-record uploads.
if len(buf) > 0 {
if err := e.uploadBuffer(ctx, buf, nil, flushMetadata{reason: reason}); err != nil {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 [concern] This flush path drops resource-derived upload options. Timer/shutdown flushes will always go to the default bucket/prefix, so resource_attrs_to_s3 routing is bypassed for exactly the buffered batches this code is meant to rescue. That can mix unrelated resources in the same object path. Preserve the upload options for the batch, or track per-resource batches so flushed objects still route correctly.

Address 5 remaining gaps found in pre-merge audit:

1. Default MaxFileSizeBytes to 256MB — OtlpJSON always uses batching
   marshaler by default, matching old fork behavior
2. Port FlushLogsWithReason interface — extensions can receive flush
   reason on timer/shutdown flushes
3. Trace JSONL: unwrap resourceSpans into individual entries, wrap each
   in OTLP envelope on flush — matches old fork output format
4. Document S3KeyTemplate function limitations (no full Sprig)
5. Default partition to hour-level — matches old fork factory default

Also updates test expectations for new defaults.

Refs: SAW-7101

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

@sawmills-staging sawmills-staging Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Compared with previous telemetry review: 0 new, 1 still open, 1 fixed, 0 accepted.

Advisory: [LOW] flushLogBatch() (line 102) and flushTraceBatch() (line 181) each emit a Debug log on every invocation. flushLogBatch() is called from MarshalLogs() whenever uncompressedSize >= maxBatchSize — firing on every size-threshold crossing during hot-path ingestion — and from FlushLogs() at timer/shutdown boundaries. flushTraceBatch() follows the same pattern via MarshalTraces(). The fields logged (batch_count, uncompressed_size) duplicate information already observable through the OTel metrics introduced in the same change (otelcol_exporter_awss3_flush_complete_total, otelcol_exporter_awss3_upload_bytes, otelcol_exporter_awss3_upload_object_size). When an operator enables debug logging to diagnose a real issue, these two calls become the dominant log stream, masking more actionable signals. The org pipeline routes logs to log-indexing destinations (datadog, otlpHttp) in addition to the awsS3 destination, so any debug-level log drain that reaches those indexing destinations would also inflate ingestion volume. The previous finding was marked resolved but the Debug calls were not removed — they are present unchanged in the current head.
Action: Remove both Debug calls from the internal flushLogBatch() and flushTraceBatch() helpers. If flush confirmation is needed for debugging, emit a single Debug log at the FlushLogs()/FlushTraces() public entry points (timer and shutdown paths only), not inside the internal helpers that are also called on every size-threshold crossing. The existing flush/upload metrics (flush_complete_total, upload_bytes, upload_object_size) already provide the same batch_count and size observability without per-flush log noise.
Location: exporter/awss3exporter/json_batching_marshaler.go:102.

@sawmills-architect-review sawmills-architect-review Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[ARCH-REVIEW] Re-review: REQUEST_CHANGES

1 blocking issue remains, timer/shutdown flushes still drop resource-derived upload options so resource_attrs_to_s3 routing can be bypassed after buffering. 1 unresolved review thread remains.

@@ -84,6 +87,7 @@ func newS3Exporter(
signalType: signalType,

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 [blocking] Previously flagged, still unaddressed: timer/shutdown flushes still upload buffered bytes with nil options, so any resource_attrs_to_s3 override is lost once data has been buffered across multiple calls. batchperresourceattr only splits incoming batches, it does not preserve route metadata in s3Exporter's buffer. Keep the UploadOptions with each buffered batch, or split buffers by route.

@cubic-dev-ai cubic-dev-ai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2 issues found across 9 files (changes from recent commits).

Prompt for AI agents (unresolved issues)

Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.


<file name="exporter/awss3exporter/json_batching_marshaler.go">

<violation number="1" location="exporter/awss3exporter/json_batching_marshaler.go:150">
P2: Trace batch size accounting underestimates output size by ignoring the per-line OTLP envelope, so `max_file_size_bytes` can flush late and produce larger-than-configured objects.</violation>
</file>

<file name="exporter/awss3exporter/factory.go">

<violation number="1" location="exporter/awss3exporter/factory.go:65">
P1: Setting `MaxFileSizeBytes` non-zero in the default config enables JSON batching for all `otlp_json` users, but batching flushes bypass `resource_attrs_to_s3` routing (`nil` UploadOptions). This changes default behavior and can send flushed data to the wrong S3 prefix/bucket.</violation>
</file>

Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review, or fix all with cubic.

},
MarshalerName: "otlp_json",
MarshalerName: "otlp_json",
MaxFileSizeBytes: defaultMaxFileSizeBytes,

@cubic-dev-ai cubic-dev-ai Bot Apr 17, 2026

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1: Setting MaxFileSizeBytes non-zero in the default config enables JSON batching for all otlp_json users, but batching flushes bypass resource_attrs_to_s3 routing (nil UploadOptions). This changes default behavior and can send flushed data to the wrong S3 prefix/bucket.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At exporter/awss3exporter/factory.go, line 65:

<comment>Setting `MaxFileSizeBytes` non-zero in the default config enables JSON batching for all `otlp_json` users, but batching flushes bypass `resource_attrs_to_s3` routing (`nil` UploadOptions). This changes default behavior and can send flushed data to the wrong S3 prefix/bucket.</comment>

<file context>
@@ -55,13 +55,14 @@ func createDefaultConfig() component.Config {
 		},
-		MarshalerName: "otlp_json",
+		MarshalerName:    "otlp_json",
+		MaxFileSizeBytes: defaultMaxFileSizeBytes,
 	}
 }
</file context>
Fix with Cubic

for _, span := range spans {
m.traceBatches = append(m.traceBatches, span)
m.tracesBatchCount++
m.uncompressedSize += len(span)

@cubic-dev-ai cubic-dev-ai Bot Apr 17, 2026

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: Trace batch size accounting underestimates output size by ignoring the per-line OTLP envelope, so max_file_size_bytes can flush late and produce larger-than-configured objects.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At exporter/awss3exporter/json_batching_marshaler.go, line 150:

<comment>Trace batch size accounting underestimates output size by ignoring the per-line OTLP envelope, so `max_file_size_bytes` can flush late and produce larger-than-configured objects.</comment>

<file context>
@@ -121,18 +121,36 @@ func (m *jsonBatchingMarshaler) MarshalTraces(td ptrace.Traces) ([]byte, error)
+	for _, span := range spans {
+		m.traceBatches = append(m.traceBatches, span)
+		m.tracesBatchCount++
+		m.uncompressedSize += len(span)
+	}
+
</file context>
Suggested change
m.uncompressedSize += len(span)
m.uncompressedSize += len(span) + len(`{"resourceSpans":[]}`)
Fix with Cubic

@ishaish103 ishaish103 merged commit bc02473 into main Apr 17, 2026
189 of 197 checks passed
@ishaish103 ishaish103 deleted the fix/SAW-7101-port-legacy-awss3-capabilities branch April 17, 2026 21:09
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants