feat(awss3exporter): add legacy awss3 and parquet compatibility#37
Conversation
|
You have reached your Codex usage limits for code reviews. You can see your limits in the Codex usage dashboard. |
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
WalkthroughAdds legacy S3 config inputs and validation; instruments awss3 exporter with flush/upload telemetry and flush-metadata plumbing; extends partition key templating and SSE/static creds support; introduces a parquet log encoding extension that buffers/converts logs to Parquet, exposes flush-with-metadata APIs, and provides telemetry. Changes
Sequence Diagram(s)sequenceDiagram
participant Collector as Collector
participant ParquetExt as Parquet Log Encoding Extension
participant ParquetWriter as Parquet Writer
participant MemFile as In-Memory File
participant Metrics as Telemetry Metrics
Collector->>ParquetExt: ConsumeLogs(logs)
activate ParquetExt
ParquetExt->>Metrics: recordBufferState()
ParquetExt->>ParquetExt: ConvertToParquet(logs)
ParquetExt->>ParquetWriter: Write(records)
ParquetWriter->>MemFile: Write payload
ParquetExt->>Metrics: Update buffer size/count
alt Size threshold exceeded
ParquetExt->>Metrics: recordFlushAttempt()
ParquetExt->>ParquetWriter: Stop()
ParquetWriter->>MemFile: Finalize
ParquetExt->>MemFile: ReadBytes()
MemFile-->>ParquetExt: Parquet bytes
ParquetExt->>ParquetWriter: NewWriter()
ParquetExt->>Metrics: recordFlush(reason, duration)
ParquetExt-->>Collector: bytes + metadata
else No flush
ParquetExt-->>Collector: nil
end
deactivate ParquetExt
sequenceDiagram
participant Collector as Collector
participant S3Exp as awss3 Exporter
participant Telemetry as Exporter Telemetry
participant Marshaler as Log Marshaler
participant Uploader as S3 Uploader
Collector->>S3Exp: ConsumeLogs(logs)
activate S3Exp
S3Exp->>Telemetry: recordFlushStart()
S3Exp->>Marshaler: MarshalLogsWithFlushMetadata(logs)
Marshaler-->>S3Exp: bytes, flushMetadata
S3Exp->>Telemetry: recordFlushEnd(flushMetadata)
S3Exp->>Telemetry: recordUploadStart()
S3Exp->>Uploader: Upload(bytes)
Uploader-->>S3Exp: success/error
S3Exp->>Telemetry: recordUploadEnd(duration)
S3Exp->>Telemetry: recordFlushToUploadDuration(...)
S3Exp-->>Collector: result
deactivate S3Exp
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
8 issues found across 31 files
Confidence score: 3/5
- There is some meaningful merge risk: several medium-to-high severity, high-confidence issues (6–7/10) affect runtime behavior, so this is not just test-only cleanup.
- The most severe issue is in
extension/parquetlogencodingextension/mem.go:Close()can ignore reopen failures before callingOnClose, which can mask real shutdown/open-state errors and lead to incorrect lifecycle handling. exporter/awss3exporter/internal/upload/partition.goandexporter/awss3exporter/config.goboth have concrete correctness risks (empty/legacy S3 key behavior and partial static credential fallback), andextension/parquetlogencodingextension/adapters/datadog/adapter.gocan emit incorrect messages in ddtags mode.- Pay close attention to
extension/parquetlogencodingextension/mem.go,exporter/awss3exporter/internal/upload/partition.go,exporter/awss3exporter/config.go, andextension/parquetlogencodingextension/adapters/datadog/adapter.go- they contain the highest-impact correctness and concurrency/lifecycle concerns.
Prompt for AI agents (unresolved issues)
Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.
<file name="exporter/awss3exporter/internal/upload/partition.go">
<violation number="1" location="exporter/awss3exporter/internal/upload/partition.go:64">
P2: Do not return the legacy template key unconditionally; template errors currently collapse to an empty S3 key.</violation>
</file>
<file name="exporter/awss3exporter/s3_writer_test.go">
<violation number="1" location="exporter/awss3exporter/s3_writer_test.go:148">
P2: This test has a data race: `s3Authorization` is written in the HTTP handler goroutine and read in the test goroutine without synchronization.</violation>
</file>
<file name="extension/parquetlogencodingextension/extension.go">
<violation number="1" location="extension/parquetlogencodingextension/extension.go:341">
P2: Close the opened parquet file handle even when reading bytes fails; the current early return leaks the open file on read errors.</violation>
</file>
<file name="exporter/awss3exporter/config.go">
<violation number="1" location="exporter/awss3exporter/config.go:46">
P2: Validate static AWS credentials as a pair (both set or both unset) to avoid silent fallback to ambient credentials on partial configuration.</violation>
</file>
<file name="extension/parquetlogencodingextension/mem.go">
<violation number="1" location="extension/parquetlogencodingextension/mem.go:30">
P2: Avoid unsynchronized check-then-set on the package-global `memFs`; concurrent extension initialization can race here and corrupt shared filesystem state.</violation>
<violation number="2" location="extension/parquetlogencodingextension/mem.go:93">
P1: Handle the reopen error in `Close()` before invoking `OnClose`; currently open failures are silently ignored.</violation>
</file>
<file name="exporter/awss3exporter/exporter_test.go">
<violation number="1" location="exporter/awss3exporter/exporter_test.go:198">
P2: This test uses a hard-coded 150ms cap for `flush_to_upload_duration`, which is timing-sensitive and can fail nondeterministically under CI scheduling delays.</violation>
</file>
<file name="extension/parquetlogencodingextension/adapters/datadog/adapter.go">
<violation number="1" location="extension/parquetlogencodingextension/adapters/datadog/adapter.go:150">
P2: `transformWithDdTags` ignores `msg`/`message`/`log` attributes, so ddtags-mode records can produce the wrong message by falling back to `record.Body()`.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review, or fix all with cubic.
There was a problem hiding this comment.
Actionable comments posted: 7
🧹 Nitpick comments (5)
exporter/awss3exporter/exporter_test.go (1)
146-199: Comprehensive telemetry test with one timing fragility concern.The test thoroughly validates flush/upload telemetry metrics including counters, histograms, and attributes. However, the assertion on Line 198 uses a hardcoded threshold:
assert.Less(t, flushToUploadPoint.Sum, int64(150))This timing-based assertion could be flaky in slow CI environments. Consider either:
- Increasing the threshold with a safety margin
- Making the assertion relative to
handoffGapwith a multiplier- Documenting why 150ms is a reasonable upper bound
💡 Suggestion to make timing assertion more resilient
const ( expectedReason = "manual" handoffGap = 50 * time.Millisecond uploadDelay = 200 * time.Millisecond + // Allow 3x handoffGap for CI environment variability + maxFlushToUploadDuration = 3 * handoffGap ) ... -assert.Less(t, flushToUploadPoint.Sum, int64(150)) +assert.Less(t, flushToUploadPoint.Sum, maxFlushToUploadDuration.Milliseconds())🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@exporter/awss3exporter/exporter_test.go` around lines 146 - 199, The test TestExporterRecordsEquivalentFlushAndUploadTelemetry uses a fragile hardcoded threshold (assert.Less(..., int64(150))) for flush-to-upload latency; replace that fixed 150ms with a resilient computed threshold based on the test timing variables (e.g., use handoffGap.Milliseconds() multiplied by a safety factor like 3 or take the max of that and a baseline) and assert flushToUploadPoint.Sum is less than that computed threshold so the check scales with handoffGap and avoids CI flakiness; update the assertion referencing flushToUploadPoint, handoffGap and uploadDelay (if desired) to compute the threshold before the final assert.extension/parquetlogencodingextension/generated_component_test.go (1)
24-37: Emptytests::configin metadata.yaml may cause test brittleness.The test loads configuration from
metadata.yaml'stests::configsection, but based on the relevant snippets, this section is empty. While the test will pass becauseCreateDefaultConfig()populates all required defaults andUnmarshalon an empty config section won't override them, this approach is fragile:
- The test doesn't validate any specific configuration scenarios
- If required fields without defaults are added later, this test will silently fail
Consider adding explicit configuration values to
tests::configinmetadata.yamlto make the test more meaningful and resilient.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@extension/parquetlogencodingextension/generated_component_test.go` around lines 24 - 37, TestComponentLifecycle currently unmarshals an empty "tests::config" from metadata.yaml which makes the test brittle; update the test to supply explicit configuration values instead of relying on defaults by either adding concrete entries to the "tests::config" section in metadata.yaml or setting fields directly on cfg returned by CreateDefaultConfig() before calling factory.Create; ensure required fields for the extension (referenced by NewFactory, CreateDefaultConfig, and factory.Create) are populated so the test asserts a meaningful configuration scenario and remains robust if new non-default fields are introduced.exporter/awss3exporter/s3_writer.go (2)
75-79: Duplicate endpoint configuration.The
BaseEndpointis set twice in thes3Optsslice whenconf.S3Uploader.Endpointis non-empty, once at lines 75-79 and again at lines 93-97. This is redundant.♻️ Suggested fix to remove duplicate endpoint setting
Remove the second occurrence (lines 93-97):
if arn := conf.S3Uploader.RoleArn; arn != "" { s3Opts = append(s3Opts, func(o *s3.Options) { o.Credentials = stscreds.NewAssumeRoleProvider(sts.NewFromConfig(cfg), arn) }) } - - if endpoint := conf.S3Uploader.Endpoint; endpoint != "" { - s3Opts = append(s3Opts, func(o *s3.Options) { - o.BaseEndpoint = aws.String(endpoint) - }) - }Also applies to: 93-97
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@exporter/awss3exporter/s3_writer.go` around lines 75 - 79, The BaseEndpoint is being appended to s3Opts twice when conf.S3Uploader.Endpoint is non-empty; remove the duplicate block that sets o.BaseEndpoint (the second occurrence) so BaseEndpoint is only configured once—locate the repeated anonymous option that modifies s3.Options.BaseEndpoint and delete the later one, leaving a single conditional that appends the endpoint-setting func when conf.S3Uploader.Endpoint != "".
157-178: Reflection-based SSE application is fragile but acceptable for this use case.Using reflection to set
ServerSideEncryptionon the request parameters works around the AWS SDK's middleware abstraction. The defensive checks (nil, pointer kind, struct kind, field validity, settability) are thorough. However, this approach is brittle and may break if AWS SDK changes the parameter struct's field name or type.Consider adding a comment explaining why reflection is used here (e.g., "The AWS SDK v2 doesn't expose a direct option for SSE on the S3 client options, so we use middleware to set it on each request's parameters").
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@exporter/awss3exporter/s3_writer.go` around lines 157 - 178, The applyServerSideEncryption function uses reflection to set the ServerSideEncryption string field on request parameter structs because the AWS SDK v2 doesn't expose a direct S3 client option for per-request SSE; add a concise comment above applyServerSideEncryption explaining that reflection is used intentionally as a workaround for AWS SDK v2/middleware limitations, noting the risk that renaming/changing the ServerSideEncryption field in the SDK would break this code and that the defensive checks (pointer, struct, field validity, CanSet, string kind) are in place to mitigate that fragility.exporter/awss3exporter/internal/upload/partition.go (1)
77-94: Silent failure on template errors may hide configuration issues.When
template.Parseortemplate.Executefails,buildLegacyTemplateKeyreturns an empty string silently. This could lead to unexpected behavior (e.g., empty S3 keys) that would be difficult to diagnose. Consider logging a warning or propagating the error.♻️ Suggested improvement to log template errors
One option is to change the function signature to return an error, or accept a logger to emit warnings. If keeping the current signature for compatibility:
func buildLegacyTemplateKey(prefix, templateText string, ts time.Time) string { tmpl, err := template.New("legacy-s3-key").Parse(templateText) if err != nil { + // Log or handle parse error - currently fails silently return "" } var rendered bytes.Buffer err = tmpl.Execute(&rendered, legacyTemplateData{ Prefix: prefix, Date: ts.UTC().Format("2006/01/02"), UUID: uuid.NewString(), }) if err != nil { + // Log or handle execute error - currently fails silently return "" } return rendered.String() }Alternatively, the caller (
PartitionKeyBuilder.Build) could fall back to the standard path when the legacy template returns empty.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@exporter/awss3exporter/internal/upload/partition.go` around lines 77 - 94, buildLegacyTemplateKey currently swallows errors from template.Parse and tmpl.Execute and returns an empty string; change it to return (string, error) so callers can detect failures (e.g., func buildLegacyTemplateKey(prefix, templateText string, ts time.Time) (string, error)), propagate the error up to PartitionKeyBuilder.Build (or whichever calls buildLegacyTemplateKey) and make the caller either log a warning or fall back to the standard key-building path when an error is returned; specifically surface parse/execute errors from buildLegacyTemplateKey instead of returning "" so template.Parse and tmpl.Execute failures are visible to the caller.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@exporter/awss3exporter/factory.go`:
- Line 76: The factory tests (TestCreateMetrics, TestCreateTraces,
TestCreateLogs) currently only assert exporter creation; add assertions that
cfg.S3Uploader.S3PartitionFormat was set to the normalized value by the factory
(the assignment at cfg.S3Uploader.S3PartitionFormat =
cfg.normalizedS3PartitionFormat()). After creating the exporter in each test,
fetch the config used (or the factory config) and assert
S3Uploader.S3PartitionFormat equals cfg.normalizedS3PartitionFormat() or the
expected normalized string; reference the normalizedS3PartitionFormat() helper
and the S3Uploader.S3PartitionFormat field to locate the code to validate.
In `@extension/parquetlogencodingextension/adapters/datadog/adapter.go`:
- Around line 166-173: The resource attribute copy currently overwrites
per-record attributes in item.Attributes; change the resource.Attributes().Range
loop used when populating item.Attributes (and the analogous block around
flattenAttribute at the other location) to check if a key already exists in
item.Attributes before assigning, so resource-level keys are only set when
absent (i.e., do not clobber existing item.Attributes entries produced from the
log record); use the existing flattenAttribute helper and item.Attributes map
lookups to implement the conditional assignment.
- Around line 147-191: Both transformWithDdTags and transformDefault must
produce identical message and status handling: ensure both functions read
message keys ("msg","message","log") from record.Attributes() and fallback to
record.Body().AsString() the same way, and ensure they set both the item.Message
field and the corresponding item.Attributes["message"] consistently; likewise
compute status once (using record.SeverityText/SeverityNumber and attribute keys
"status","severity","level","syslog.severity") and set both item.Status
(lowercased) and item.Attributes["status"] to the same value. Update the logic
in transformWithDdTags and transformDefault to reuse the same extraction/casing
code paths (referencing variables/status assignment, item.Message, item.Status,
and item.Attributes) so resource.ddtags being a map does not change
serialization.
In `@extension/parquetlogencodingextension/config.go`:
- Around line 10-16: defaultRowGroupSizeBytes (128 MiB) is larger than
defaultMaxFileSizeBytes (100 MiB), which can cause row groups to exceed intended
file size; either reduce defaultRowGroupSizeBytes to <= defaultMaxFileSizeBytes
or add validation in the config's Validate() method to enforce RowGroupSizeBytes
<= MaxFileSizeBytes (and return a clear error) so callers cannot construct
configs that violate the file size constraint; reference the constants
defaultRowGroupSizeBytes, defaultMaxFileSizeBytes and the Validate() function
when making the change.
In `@extension/parquetlogencodingextension/extension_test.go`:
- Around line 105-107: The test currently constructs the extension via
NewParquetLogExtension using extensiontest.NewNopSettings which provides a no-op
meter provider, so metrics paths are unvalidated; replace or augment the test in
extension_test.go to instantiate a real MeterProvider (e.g.,
sdkmetric.NewMeterProvider with a ManualReader or in-memory exporter) and pass
settings that use that provider instead of NewNopSettings, then assert that the
expected counters/gauges/histogram/callback are created and emit values when
exercising NewParquetLogExtension and its lifecycle (use testExtensionType to
locate the extension creation). Ensure the new test(s) fail if metrics are not
registered or no measurements are recorded.
- Around line 125-127: The tests always set resource.ddtags via
resourceLogs.Resource().Attributes().PutEmptyMap("ddtags"), so the exercises
always follow transformWithDdTags and never exercise transformDefault; update
the test(s) to add a case that omits creating "ddtags" (i.e., do not call
PutEmptyMap/PutStr for "ddtags") and then call writeDatadogLogs (or directly
invoke transformDefault in an adapter-focused unit test) and assert the fallback
behavior/output expected from transformDefault (e.g., encoded fields, tags, or
return path), or add a separate unit test that constructs ResourceLogs without
the ddtags attribute and verifies transformDefault is used.
In `@extension/parquetlogencodingextension/mem.go`:
- Around line 88-98: The Close method on MemFile currently ignores errors from
fs.Open and never closes the reopened handle; update MemFile.Close so that when
fs.OnClose is non-nil it calls f, err := fs.Open(fs.FilePath), checks and
returns the err if non-nil, and ensures the reopened reader is closed after the
callback (e.g., defer f.Close() immediately after successful open and before
calling fs.OnClose(filepath.Base(fs.FilePath), f)) so OnClose gets a valid
reader and the handle is not leaked.
---
Nitpick comments:
In `@exporter/awss3exporter/exporter_test.go`:
- Around line 146-199: The test
TestExporterRecordsEquivalentFlushAndUploadTelemetry uses a fragile hardcoded
threshold (assert.Less(..., int64(150))) for flush-to-upload latency; replace
that fixed 150ms with a resilient computed threshold based on the test timing
variables (e.g., use handoffGap.Milliseconds() multiplied by a safety factor
like 3 or take the max of that and a baseline) and assert flushToUploadPoint.Sum
is less than that computed threshold so the check scales with handoffGap and
avoids CI flakiness; update the assertion referencing flushToUploadPoint,
handoffGap and uploadDelay (if desired) to compute the threshold before the
final assert.
In `@exporter/awss3exporter/internal/upload/partition.go`:
- Around line 77-94: buildLegacyTemplateKey currently swallows errors from
template.Parse and tmpl.Execute and returns an empty string; change it to return
(string, error) so callers can detect failures (e.g., func
buildLegacyTemplateKey(prefix, templateText string, ts time.Time) (string,
error)), propagate the error up to PartitionKeyBuilder.Build (or whichever calls
buildLegacyTemplateKey) and make the caller either log a warning or fall back to
the standard key-building path when an error is returned; specifically surface
parse/execute errors from buildLegacyTemplateKey instead of returning "" so
template.Parse and tmpl.Execute failures are visible to the caller.
In `@exporter/awss3exporter/s3_writer.go`:
- Around line 75-79: The BaseEndpoint is being appended to s3Opts twice when
conf.S3Uploader.Endpoint is non-empty; remove the duplicate block that sets
o.BaseEndpoint (the second occurrence) so BaseEndpoint is only configured
once—locate the repeated anonymous option that modifies s3.Options.BaseEndpoint
and delete the later one, leaving a single conditional that appends the
endpoint-setting func when conf.S3Uploader.Endpoint != "".
- Around line 157-178: The applyServerSideEncryption function uses reflection to
set the ServerSideEncryption string field on request parameter structs because
the AWS SDK v2 doesn't expose a direct S3 client option for per-request SSE; add
a concise comment above applyServerSideEncryption explaining that reflection is
used intentionally as a workaround for AWS SDK v2/middleware limitations, noting
the risk that renaming/changing the ServerSideEncryption field in the SDK would
break this code and that the defensive checks (pointer, struct, field validity,
CanSet, string kind) are in place to mitigate that fragility.
In `@extension/parquetlogencodingextension/generated_component_test.go`:
- Around line 24-37: TestComponentLifecycle currently unmarshals an empty
"tests::config" from metadata.yaml which makes the test brittle; update the test
to supply explicit configuration values instead of relying on defaults by either
adding concrete entries to the "tests::config" section in metadata.yaml or
setting fields directly on cfg returned by CreateDefaultConfig() before calling
factory.Create; ensure required fields for the extension (referenced by
NewFactory, CreateDefaultConfig, and factory.Create) are populated so the test
asserts a meaningful configuration scenario and remains robust if new
non-default fields are introduced.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 891bc20a-081b-44c1-b3be-6643b0645d43
⛔ Files ignored due to path filters (1)
extension/parquetlogencodingextension/go.sumis excluded by!**/*.sum
📒 Files selected for processing (30)
exporter/awss3exporter/config.goexporter/awss3exporter/config_test.goexporter/awss3exporter/exporter.goexporter/awss3exporter/exporter_test.goexporter/awss3exporter/factory.goexporter/awss3exporter/internal/upload/partition.goexporter/awss3exporter/internal/upload/partition_test.goexporter/awss3exporter/marshaler_test.goexporter/awss3exporter/s3_marshaler.goexporter/awss3exporter/s3_writer.goexporter/awss3exporter/s3_writer_test.goexporter/awss3exporter/testdata/legacy-config.yamlexporter/awss3exporter/testdata/legacy-datadog-json.yamlexporter/awss3exporter/testdata/legacy-parquet.yamlextension/parquetlogencodingextension/README.mdextension/parquetlogencodingextension/adapters/adapter.goextension/parquetlogencodingextension/adapters/datadog/adapter.goextension/parquetlogencodingextension/adapters/datadog/adapter_test.goextension/parquetlogencodingextension/config.goextension/parquetlogencodingextension/config_test.goextension/parquetlogencodingextension/extension.goextension/parquetlogencodingextension/extension_test.goextension/parquetlogencodingextension/factory.goextension/parquetlogencodingextension/generated_component_test.goextension/parquetlogencodingextension/go.modextension/parquetlogencodingextension/internal/metadata/generated_status.goextension/parquetlogencodingextension/internal/metadata/generated_telemetry.goextension/parquetlogencodingextension/mem.goextension/parquetlogencodingextension/metadata.yamlextension/parquetlogencodingextension/telemetry.go
There was a problem hiding this comment.
Actionable comments posted: 5
🧹 Nitpick comments (4)
extension/parquetlogencodingextension/mem.go (1)
28-51:Openoverwritesfs.Filewithout closing the previous handle.If
Openis called multiple times (e.g., during error recovery or retries), the previous file handle stored infs.Fileis overwritten without being closed, potentially leaking the in-memory file descriptor.From the current usage in
defaultReadWriterBytes, this pattern isn't triggered becauseOpenis called once afterClose. However, for defensive coding and future-proofing:🛡️ Suggested defensive fix
func (fs *MemFile) Open(name string) (source.ParquetFile, error) { if name == "" { name = fs.FilePath } + if fs.File != nil { + _ = fs.File.Close() // Best-effort close of previous handle + } file, err := fs.Fs.Open(name) if err != nil { return fs, err } fs.File = file fs.FilePath = name return fs, nil }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@extension/parquetlogencodingextension/mem.go` around lines 28 - 51, The Open method overwrites fs.File without closing any existing handle, causing a potential resource leak; modify MemFile.Open to check if fs.File is non-nil and close it (handle/propagate the close error appropriately) before calling fs.Fs.Open and assigning the new file and fs.FilePath, and ensure similar defensive behavior in MemFile.Create if it can be called when fs.File is already set; reference the MemFile.Open and MemFile.Create methods and the fs.File field when making the change.extension/parquetlogencodingextension/extension.go (3)
282-305: Snapshot performs shallow copy of records.Line 287 uses
copy()which performs a shallow copy of the slice. If the record objects are later modified (before restore is needed), the snapshot will contain the modified data.From the code flow, this appears safe because:
- Records are only written to
Objs, never modified in place- Restore is only called immediately after a failure
However, this assumption should be documented or the copy made deep if records could be mutable.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@extension/parquetlogencodingextension/extension.go` around lines 282 - 305, snapshotBufferedStateLocked currently makes a shallow copy of e.writer.Objs into bufferedStateSnapshot.records which can lead to stale/modified data if record objects are mutated; update snapshotBufferedStateLocked to perform a defensive deep-copy of each record (or serialize/clone them) before storing in bufferedStateSnapshot.records, or add a clear comment in the function and in the bufferedStateSnapshot type documenting the invariant that records are never mutated after being appended to e.writer.Objs and that restoreBufferedStateLocked assumes immutability; reference the functions snapshotBufferedStateLocked, restoreBufferedStateLocked, bufferedStateSnapshot and the writer field e.writer.Objs when applying the change.
29-50: Storingcontext.Contextin struct is discouraged.The
ctxfield (line 32) stores a context passed at construction time. This context is used later inMarshalLogsWithFlushMetadata(line 173) forConvertToParquet. If this context is cancelled or times out between construction and use, operations will fail unexpectedly.Consider passing context to methods that need it (like
MarshalLogs) instead of storing it in the struct, following Go best practices.♻️ Suggested approach
type parquetLogExtension struct { memFile *MemFile logger *zap.Logger - ctx context.Context config *Config // ... rest of fields } -func (e *parquetLogExtension) MarshalLogsWithFlushMetadata( +func (e *parquetLogExtension) MarshalLogsWithFlushMetadata( + ctx context.Context, ld plog.Logs, ) ([]byte, string, time.Time, error) { - toParquet, err := e.adapter.ConvertToParquet(e.ctx, ld) + toParquet, err := e.adapter.ConvertToParquet(ctx, ld)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@extension/parquetlogencodingextension/extension.go` around lines 29 - 50, The struct parquetLogExtension currently stores a context in the ctx field which is discouraged; remove the ctx field and update methods that need a context (e.g., MarshalLogsWithFlushMetadata, MarshalLogs, and any call sites that invoke ConvertToParquet) to accept a context.Context parameter instead of reading from the struct; update the constructor/initializers so they no longer store ctx and propagate the caller-supplied context into each method call (and into ConvertToParquet invocations) at the time of use to avoid using a stale/cancelled context.
92-125: Config mutation ininitializeWriteris side-effectful.Lines 93-101 mutate
e.configfields if they have default/zero values. This is problematic because:
initializeWriteris called during construction and during restore/reinitialization- The config object may be shared or inspected elsewhere
- Default application should happen once during config validation, not repeatedly
Consider applying defaults in a config validation step or in the factory before storing the config.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@extension/parquetlogencodingextension/extension.go` around lines 92 - 125, The initializeWriter method mutates e.config (CompressionCodec, PageSizeBytes, RowGroupSizeBytes) which can leak defaults into shared config; instead, implement a config validation/factory step (e.g., validateConfig or NewParquetLogExtension constructor) that applies defaults once before storing the config, and remove any writes to e.config from initializeWriter; update initializeWriter (and restore paths) to read the config immutably and compute local values (e.g., compressionCodec := e.config.CompressionCodec; if empty set local to defaultCompressionCodec; same for pageSize and rowGroupSize) and then use those locals to set pw.CompressionType, pw.PageSize, and pw.RowGroupSize so no mutation of e.config occurs.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@exporter/awss3exporter/internal/upload/partition.go`:
- Around line 78-97: Add two unit tests for buildLegacyTemplateKey: one that
passes a syntactically invalid templateText (e.g. "{{invalid") to trigger the
parse error branch and assert buildLegacyTemplateKey returns "" (and optionally
assert a log entry), and a second that triggers the Execute error branch by
using a template that parses but fails at execution (for example "{{index
.Prefix 9999}}" which will panic/index-out-of-range at Execute) and assert it
returns "" (and optionally assert the error log). Target the
buildLegacyTemplateKey function and legacyTemplateData shape in your tests so
they exercise the parse and execute error branches directly.
- Around line 65-67: buildLegacyTemplateKey currently returns an empty string on
template parse/execute failures, which allows an empty S3 key to be passed to
the uploader; change buildLegacyTemplateKey (and its caller
legacyTemplatePrefix/where it’s invoked in partition.go) to return an error
instead of silently returning "" or, if you prefer seamless behavior, fall back
to the standard key path by calling the non-legacy key builder when a template
error occurs; update the call site in partition.go to handle the error (or
accept the fallback key) so writer.go never receives an empty key, and add unit
tests covering template parse and execution failures to assert the error
propagation or fallback behavior.
In `@extension/parquetlogencodingextension/extension.go`:
- Around line 51-90: The code in NewParquetLogExtension does an unchecked type
assertion baseCfg.(*Config) which can panic; change this to a safe type check by
using the comma-ok idiom (or a type switch) to verify baseCfg is a *Config and
return a clear error if not, e.g., check the assertion result into cfg and ok,
handle !ok by returning nil and fmt.Errorf with context, ensuring subsequent
uses of cfg (e.g., cfg.MaxFileSizeBytes, fileName creation, initializeWriter)
only run when cfg is valid.
- Around line 203-280: The current logic in checkAndFlushWithMetadata calls
restoreBufferedStateLocked after reinitializeWriterFn fails, but
restoreBufferedStateLocked itself calls reinitializeWriterFn, so a persistent
reinitialize error will cause an immediate repeat of the same failing call;
change checkAndFlushWithMetadata to NOT call restoreBufferedStateLocked when
reinitializeWriterFn returns an error — instead record the failed flush, log the
reinitialize error (as already done), and return the original reinitialize error
(possibly wrapped) so we avoid a redundant reinitialize attempt; update
references in this function (reinitializeWriterFn, restoreBufferedStateLocked,
checkAndFlushWithMetadata) accordingly and ensure
telemetry.recordFailedFlush/relevant logs remain.
- Around line 307-328: The Write method on parquetLogExtension directly mutates
ParquetWriter internals (pw.Objs, pw.ObjSize, pw.ObjsSize, pw.CheckSizeCritical)
and omits the pw.stopped check, which couples us to parquet-go internals;
refactor by introducing a small wrapper/abstraction around ParquetWriter that
exposes safe operations (e.g., AppendBuffered(obj any) and FlushIfNeeded()) and
move the size/obj bookkeeping into that wrapper, or at minimum add a pw.stopped
check and encapsulate all field reads/writes behind helper methods on
parquetLogExtension (e.g., ensureStoppedCheck(), updateSizing()) so the direct
field access is centralized and documented as intentional divergence from
ParquetWriter.Write.
---
Nitpick comments:
In `@extension/parquetlogencodingextension/extension.go`:
- Around line 282-305: snapshotBufferedStateLocked currently makes a shallow
copy of e.writer.Objs into bufferedStateSnapshot.records which can lead to
stale/modified data if record objects are mutated; update
snapshotBufferedStateLocked to perform a defensive deep-copy of each record (or
serialize/clone them) before storing in bufferedStateSnapshot.records, or add a
clear comment in the function and in the bufferedStateSnapshot type documenting
the invariant that records are never mutated after being appended to
e.writer.Objs and that restoreBufferedStateLocked assumes immutability;
reference the functions snapshotBufferedStateLocked, restoreBufferedStateLocked,
bufferedStateSnapshot and the writer field e.writer.Objs when applying the
change.
- Around line 29-50: The struct parquetLogExtension currently stores a context
in the ctx field which is discouraged; remove the ctx field and update methods
that need a context (e.g., MarshalLogsWithFlushMetadata, MarshalLogs, and any
call sites that invoke ConvertToParquet) to accept a context.Context parameter
instead of reading from the struct; update the constructor/initializers so they
no longer store ctx and propagate the caller-supplied context into each method
call (and into ConvertToParquet invocations) at the time of use to avoid using a
stale/cancelled context.
- Around line 92-125: The initializeWriter method mutates e.config
(CompressionCodec, PageSizeBytes, RowGroupSizeBytes) which can leak defaults
into shared config; instead, implement a config validation/factory step (e.g.,
validateConfig or NewParquetLogExtension constructor) that applies defaults once
before storing the config, and remove any writes to e.config from
initializeWriter; update initializeWriter (and restore paths) to read the config
immutably and compute local values (e.g., compressionCodec :=
e.config.CompressionCodec; if empty set local to defaultCompressionCodec; same
for pageSize and rowGroupSize) and then use those locals to set
pw.CompressionType, pw.PageSize, and pw.RowGroupSize so no mutation of e.config
occurs.
In `@extension/parquetlogencodingextension/mem.go`:
- Around line 28-51: The Open method overwrites fs.File without closing any
existing handle, causing a potential resource leak; modify MemFile.Open to check
if fs.File is non-nil and close it (handle/propagate the close error
appropriately) before calling fs.Fs.Open and assigning the new file and
fs.FilePath, and ensure similar defensive behavior in MemFile.Create if it can
be called when fs.File is already set; reference the MemFile.Open and
MemFile.Create methods and the fs.File field when making the change.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: eaab0a41-ec5d-4fcc-9813-470f06a7a145
📒 Files selected for processing (10)
exporter/awss3exporter/config.goexporter/awss3exporter/config_test.goexporter/awss3exporter/internal/upload/partition.goextension/parquetlogencodingextension/Makefileextension/parquetlogencodingextension/adapters/datadog/adapter.goextension/parquetlogencodingextension/adapters/datadog/adapter_test.goextension/parquetlogencodingextension/config.goextension/parquetlogencodingextension/config_test.goextension/parquetlogencodingextension/extension.goextension/parquetlogencodingextension/mem.go
✅ Files skipped from review due to trivial changes (2)
- exporter/awss3exporter/config_test.go
- extension/parquetlogencodingextension/config.go
🚧 Files skipped from review as they are similar to previous changes (4)
- extension/parquetlogencodingextension/config_test.go
- exporter/awss3exporter/config.go
- extension/parquetlogencodingextension/adapters/datadog/adapter_test.go
- extension/parquetlogencodingextension/adapters/datadog/adapter.go
|
Latest contrib commits addressed the stale review threads and added the required changelog entry. Fresh local verification on this head: |
There was a problem hiding this comment.
0 issues found across 2 files (changes from recent commits).
Requires human review: Significant addition of a new extension and legacy S3 features (static auth, key-templating, partitioning). High-impact change on critical data paths requiring human validation.
There was a problem hiding this comment.
[ARCH-REVIEW] Re-review: no new blocking findings in the follow-up diff; legacy template parsing is now moved off the hot upload path via pre-parsed template wiring.
Cannot approve yet: 7 unresolved review thread(s) remain. Resolve all open threads and I can issue final approval.
There was a problem hiding this comment.
Actionable comments posted: 2
♻️ Duplicate comments (3)
extension/parquetlogencodingextension/extension.go (2)
51-56:⚠️ Potential issue | 🟡 MinorAvoid panicking on an invalid config type.
baseCfg.(*Config)will panic on any wiring mistake or direct test usage with the wrong config. Returning a typed error here makes startup fail cleanly instead.🛡️ Safe cast
- cfg := baseCfg.(*Config) + cfg, ok := baseCfg.(*Config) + if !ok { + return nil, fmt.Errorf("invalid config type: expected *Config, got %T", baseCfg) + }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@extension/parquetlogencodingextension/extension.go` around lines 51 - 56, The constructor NewParquetLogExtension currently uses a direct type assertion cfg := baseCfg.(*Config) which will panic on wrong wiring; replace this with a safe type assertion (cfg, ok := baseCfg.(*Config)) and if !ok return a clear typed error from NewParquetLogExtension indicating an invalid config type, so startup fails gracefully rather than panicking; ensure the error message references Config and component.Config to aid debugging.
184-200:⚠️ Potential issue | 🟠 MajorFail closed once the parquet writer has entered an unrecoverable state.
After
WriteStop()succeeds, any path that returns with a failed restore leaves the extension broken, butaddLogRecordWithFlushMetadataand the customWritestill treat the writer as usable. Since parquet-go keeps that stopped state internally, this wrapper needs its ownwriterBrokenflag; otherwise later marshals can keep appending intopw.Objseven though the underlying writer has already been finalized.💡 One safe pattern
type parquetLogExtension struct { ... + writerBroken bool } func (e *parquetLogExtension) initializeWriter() error { ... e.writer = pw e.memFile = memFile.(*MemFile) + e.writerBroken = false return nil } func (e *parquetLogExtension) addLogRecordWithFlushMetadata( records []any, ) ([]byte, string, time.Time, error) { e.mutex.Lock() defer e.mutex.Unlock() + + if e.writerBroken { + return nil, "", time.Time{}, fmt.Errorf("parquet writer is not available") + } ... } if restoreErr := e.restoreBufferedStateLocked(snapshot); restoreErr != nil { + e.writerBroken = true e.logger.Error("failed to restore buffered parquet state", zap.Error(restoreErr)) return nil, "", time.Time{}, fmt.Errorf(...) } func (e *parquetLogExtension) Write(src any) error { + if e.writerBroken { + return fmt.Errorf("parquet writer is not available") + } pw := e.writer ... }In github.com/xitongsys/parquet-go v1.6.2, what does ParquetWriter.Write do after WriteStop, and is the writer's stopped state tracked in an internal/unexported field?Also applies to: 240-266, 307-327
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@extension/parquetlogencodingextension/extension.go` around lines 184 - 200, The parquet writer wrapper must fail closed when the underlying parquet writer is finalized: add a writerBroken bool field on parquetLogExtension and set it true whenever WriteStop() succeeds or when Write/WriteStop returns an unrecoverable error; update the wrapper's Write method and addLogRecordWithFlushMetadata to first check writerBroken and return an error immediately if set, and mark writerBroken on any write path that detects the parquet writer has been stopped so no further records are appended to pw.Objs; ensure checkAndFlushWithMetadata and recordBufferStateLocked usages respect writerBroken to avoid attempting flushes after finalization.extension/parquetlogencodingextension/adapters/datadog/adapter.go (1)
147-188:⚠️ Potential issue | 🟠 MajorMirror
statusserialization in both adapter branches.The
ddtagspath writesstatusinto bothitem.Statusanditem.Attributes["status"]at Lines 187-188, but the default path only updatesitem.Statusat Line 264. That makes the serializedattributespayload depend solely on whetherresource.ddtagsis a map, and the later resource copy at Lines 250-253 can even backfill a conflicting resource-levelstatuswhen the record already provided one.💡 Minimal fix
if record.SeverityNumber() != 0 && status == "" { status = statusFromSeverityNumber(record.SeverityNumber()) } + item.Attributes["status"] = status item.Status = statusAlso applies to: 230-264
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@extension/parquetlogencodingextension/adapters/datadog/adapter.go` around lines 147 - 188, The code only writes the computed status into item.Attributes in the ddtags branch but not in the default path, causing inconsistent serialized attributes and possible resource-level backfill; update the default/path that computes status (the logic using record.SeverityText(), record.SeverityNumber(), statusFromSeverityNumber() and the local variable status) to also set item.Attributes["status"]=status in addition to item.Status=status so both branches mirror the same serialization behavior (ensure this assignment occurs after the status is finalized so record-derived status takes precedence over resource-level values).
🧹 Nitpick comments (3)
exporter/awss3exporter/config_test.go (2)
24-29: Make env injection explicit per test to avoid hidden fixture coupling.
loadConfigalways setsS3_PREFIX,S3_ACCESS_KEY_ID, andS3_SECRET_ACCESS_KEY, which can make unrelated fixture tests pass for the wrong reason. Consider passing env overrides into the helper so each test declares only what it needs.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@exporter/awss3exporter/config_test.go` around lines 24 - 29, The helper loadConfig currently unconditionally sets S3_PREFIX, S3_ACCESS_KEY_ID and S3_SECRET_ACCESS_KEY which hides fixture-specific requirements; change loadConfig(t *testing.T, fixture string) *Config to accept an explicit envOverrides map (or variadic pairs) so callers declare only the envs they need, move the Setenv calls out of the helper into each test (or apply the provided overrides inside loadConfig), and update tests to pass the required S3_* variables explicitly; reference the loadConfig helper and the Config type when making these changes.
198-222: Add the inverse static-credential pairing case for completeness.You already cover
access_key_idwithoutsecret_access_key; add the mirror case (secret_access_keywithoutaccess_key_id) to fully lock in the XOR validation behavior.As per coding guidelines, "All code changes should have tests that actually validate the changes".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@exporter/awss3exporter/config_test.go` around lines 198 - 222, Add a complementary test that verifies the inverse static-credential pairing (secret without access key) so the XOR validation is fully covered: create a new test function (e.g., TestLoadConfig_StaticCredentialsMustBePaired_SecretOnly) that calls loadConfigFromYAML with YAML supplying secret_access_key but omitting access_key_id, then require an error and assert the error message contains "access_key_id and secret_access_key must be set together" (same assertion pattern as TestLoadConfig_StaticCredentialsMustBePaired).exporter/awss3exporter/exporter_test.go (1)
132-144: Clean uploader stub with configurable delay.The stub correctly tracks upload calls and supports injecting delays for timing tests. Note that the
errfield is defined but not exercised in the current test—consider adding a follow-up test for error outcome telemetry (e.g., verifyingoutcome=failureattributes when upload fails).🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@exporter/awss3exporter/exporter_test.go` around lines 132 - 144, Stub uploaderUploader: the uploaderStub tracks uploadCalls and supports delay but its err field is not used in tests; add a follow-up test that exercises error paths and verifies telemetry. Update tests to call uploaderStub with err set (e.g., uploader.err = errors.New("fail")) and assert the Upload method returns that error and that telemetry/span attributes include outcome=failure; use uploaderStub.uploadCalls to assert the call count and uploaderStub.delay to simulate slow uploads in timing tests; target the uploaderStub type and its Upload method when locating code to change.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@exporter/awss3exporter/exporter_test.go`:
- Around line 161-167: The timing assertion is flaky because flushCompletedAt is
set at marshaler creation but the measured flush-to-upload duration is computed
when ConsumeLogs runs; update the test to capture time.Now() immediately before
calling ConsumeLogs and use that timestamp for flushCompletedAt on the
flushMetadataMarshaler (or alternatively relax the ordering assertion by
allowing a small margin), i.e., set flushMetadataMarshaler.buf/flushMeta with
flushMeta.flushCompletedAt = now := time.Now() just before invoking
exporter.ConsumeLogs, then recompute/assert flushToUploadPoint.Sum vs
uploadDurationPoint.Sum using that closer-to-consumption timestamp (or change
the strict comparison to permit a small delta).
In `@extension/parquetlogencodingextension/extension.go`:
- Around line 220-223: The flush duration metric is being seeded from
e.oldestBufferedRecord (flushStartedAt), which inflates flush latency by
including queue residence time; update the code path where flushStartedAt is set
(replace use of e.oldestBufferedRecord with time.Now() at the moment the flush
attempt begins in the flush logic that sets flushStartedAt) and make the
analogous change in the second occurrence around the other block (the lines
referenced ~269-274); add a regression test that enqueues a record, waits/delays
so bufferOldestRecordAgeSeconds reflects the queued age, then triggers a flush
and asserts that the new flush latency metric measures only the flush operation
duration (not the queued age), and ensure the test references the same public
entry point used to invoke flushing.
---
Duplicate comments:
In `@extension/parquetlogencodingextension/adapters/datadog/adapter.go`:
- Around line 147-188: The code only writes the computed status into
item.Attributes in the ddtags branch but not in the default path, causing
inconsistent serialized attributes and possible resource-level backfill; update
the default/path that computes status (the logic using record.SeverityText(),
record.SeverityNumber(), statusFromSeverityNumber() and the local variable
status) to also set item.Attributes["status"]=status in addition to
item.Status=status so both branches mirror the same serialization behavior
(ensure this assignment occurs after the status is finalized so record-derived
status takes precedence over resource-level values).
In `@extension/parquetlogencodingextension/extension.go`:
- Around line 51-56: The constructor NewParquetLogExtension currently uses a
direct type assertion cfg := baseCfg.(*Config) which will panic on wrong wiring;
replace this with a safe type assertion (cfg, ok := baseCfg.(*Config)) and if
!ok return a clear typed error from NewParquetLogExtension indicating an invalid
config type, so startup fails gracefully rather than panicking; ensure the error
message references Config and component.Config to aid debugging.
- Around line 184-200: The parquet writer wrapper must fail closed when the
underlying parquet writer is finalized: add a writerBroken bool field on
parquetLogExtension and set it true whenever WriteStop() succeeds or when
Write/WriteStop returns an unrecoverable error; update the wrapper's Write
method and addLogRecordWithFlushMetadata to first check writerBroken and return
an error immediately if set, and mark writerBroken on any write path that
detects the parquet writer has been stopped so no further records are appended
to pw.Objs; ensure checkAndFlushWithMetadata and recordBufferStateLocked usages
respect writerBroken to avoid attempting flushes after finalization.
---
Nitpick comments:
In `@exporter/awss3exporter/config_test.go`:
- Around line 24-29: The helper loadConfig currently unconditionally sets
S3_PREFIX, S3_ACCESS_KEY_ID and S3_SECRET_ACCESS_KEY which hides
fixture-specific requirements; change loadConfig(t *testing.T, fixture string)
*Config to accept an explicit envOverrides map (or variadic pairs) so callers
declare only the envs they need, move the Setenv calls out of the helper into
each test (or apply the provided overrides inside loadConfig), and update tests
to pass the required S3_* variables explicitly; reference the loadConfig helper
and the Config type when making these changes.
- Around line 198-222: Add a complementary test that verifies the inverse
static-credential pairing (secret without access key) so the XOR validation is
fully covered: create a new test function (e.g.,
TestLoadConfig_StaticCredentialsMustBePaired_SecretOnly) that calls
loadConfigFromYAML with YAML supplying secret_access_key but omitting
access_key_id, then require an error and assert the error message contains
"access_key_id and secret_access_key must be set together" (same assertion
pattern as TestLoadConfig_StaticCredentialsMustBePaired).
In `@exporter/awss3exporter/exporter_test.go`:
- Around line 132-144: Stub uploaderUploader: the uploaderStub tracks
uploadCalls and supports delay but its err field is not used in tests; add a
follow-up test that exercises error paths and verifies telemetry. Update tests
to call uploaderStub with err set (e.g., uploader.err = errors.New("fail")) and
assert the Upload method returns that error and that telemetry/span attributes
include outcome=failure; use uploaderStub.uploadCalls to assert the call count
and uploaderStub.delay to simulate slow uploads in timing tests; target the
uploaderStub type and its Upload method when locating code to change.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 328abd49-ecfb-4bd7-af2a-4602350aead0
⛔ Files ignored due to path filters (4)
extension/parquetlogencodingextension/go.sumis excluded by!**/*.sumextension/storage/inmemorystorage/go.sumis excluded by!**/*.sumprocessor/hotreloadprocessor/go.sumis excluded by!**/*.sumprocessor/logstometricsprocessor/go.sumis excluded by!**/*.sum
📒 Files selected for processing (13)
.chloggen/awss3exporter-parquet-compat.yamlexporter/awss3exporter/config.goexporter/awss3exporter/config_test.goexporter/awss3exporter/exporter_test.goexporter/awss3exporter/s3_writer_test.goexporter/loadbalancingexporter/factory.goexporter/loadbalancingexporter/go.modextension/parquetlogencodingextension/adapters/datadog/adapter.goextension/parquetlogencodingextension/extension.goextension/parquetlogencodingextension/go.modextension/storage/inmemorystorage/go.modprocessor/hotreloadprocessor/go.modprocessor/logstometricsprocessor/go.mod
💤 Files with no reviewable changes (1)
- exporter/loadbalancingexporter/factory.go
✅ Files skipped from review due to trivial changes (4)
- .chloggen/awss3exporter-parquet-compat.yaml
- extension/parquetlogencodingextension/go.mod
- exporter/awss3exporter/s3_writer_test.go
- extension/storage/inmemorystorage/go.mod
🚧 Files skipped from review as they are similar to previous changes (1)
- exporter/awss3exporter/config.go
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
exporter/awss3exporter/s3_writer.go (1)
75-79:⚠️ Potential issue | 🟡 MinorDuplicate endpoint configuration.
The
BaseEndpointis configured twice whenconf.S3Uploader.Endpointis non-empty — once at lines 75-79 and again at lines 93-97. Remove one of these blocks.🔧 Proposed fix — remove duplicate block
- if endpoint := conf.S3Uploader.Endpoint; endpoint != "" { - s3Opts = append(s3Opts, func(o *s3.Options) { - o.BaseEndpoint = aws.String(endpoint) - }) - } - var managerOpts []upload.ManagerOptAlso applies to: 93-97
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@exporter/awss3exporter/s3_writer.go` around lines 75 - 79, The duplicitous configuration sets BaseEndpoint twice when conf.S3Uploader.Endpoint is non-empty; remove one of the identical blocks that append a func to s3Opts which assigns o.BaseEndpoint = aws.String(conf.S3Uploader.Endpoint) (the duplicated s3.Options modifier around conf.S3Uploader.Endpoint) so BaseEndpoint is only set once—keep a single append to s3Opts using that endpoint and delete the other redundant block.
♻️ Duplicate comments (2)
exporter/awss3exporter/internal/upload/partition.go (1)
68-83:⚠️ Potential issue | 🟠 MajorEmpty S3 key returned on template errors can cause upload failures.
When
LegacyS3KeyTemplateParsedis nil and parsing fails (line 76), or when execution fails inbuildLegacyTemplateKey(line 100),Build()returns an empty string. This empty key propagates to S3 upload, which will reject it.Consider falling back to the standard key path instead of returning empty:
🔧 Proposed fix — fallback to standard key on template failure
func (pki *PartitionKeyBuilder) Build(ts time.Time, overridePrefix string) string { + standardKey := path.Join(pki.bucketKeyPrefix(ts, overridePrefix), pki.fileName()) + if pki.LegacyS3KeyTemplate != "" { tmpl := pki.LegacyS3KeyTemplateParsed if tmpl == nil { var err error tmpl, err = parseLegacyTemplate(pki.LegacyS3KeyTemplate) if err != nil { slog.Error("failed to parse legacy s3 key template", "error", err) - return "" + return standardKey } } - return buildLegacyTemplateKey(pki.legacyTemplatePrefix(overridePrefix), tmpl, ts) + if key := buildLegacyTemplateKey(pki.legacyTemplatePrefix(overridePrefix), tmpl, ts); key != "" { + return key + } + return standardKey } - return path.Join(pki.bucketKeyPrefix(ts, overridePrefix), pki.fileName()) + return standardKey }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@exporter/awss3exporter/internal/upload/partition.go` around lines 68 - 83, The Build method currently returns an empty string on template parse or execution failures which leads to rejected S3 uploads; modify PartitionKeyBuilder.Build so that when parseLegacyTemplate(pki.LegacyS3KeyTemplate) returns an error or when buildLegacyTemplateKey produces an empty result, you log the error and fall back to the standard key path computed by path.Join(pki.bucketKeyPrefix(ts, overridePrefix), pki.fileName()) instead of returning "". Update the error handling around parseLegacyTemplate and the call to buildLegacyTemplateKey (and any callers that propagate an empty string) so they return the fallbackKey on error or empty output, and include contextual logging with pki.legacyTemplatePrefix(overridePrefix) and the template/error details.extension/parquetlogencodingextension/extension.go (1)
223-226:⚠️ Potential issue | 🟠 MajorStart the flush timer at the flush attempt.
Line 223 seeds
flushStartedAtfrome.oldestBufferedRecord, so the new flush histogram includes queue residence time instead of just the stop/read/reinitialize work. That age is already exposed bybufferOldestRecordAgeSeconds, so this makes the latency metric misleading.⏱️ Minimal fix
- flushStartedAt := e.oldestBufferedRecord - if flushStartedAt.IsZero() { - flushStartedAt = time.Now() - } + flushStartedAt := time.Now()Please add a delayed-buffer regression test with this change. As per coding guidelines, "All code changes should have tests that actually validate the changes".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@extension/parquetlogencodingextension/extension.go` around lines 223 - 226, Replace seeding flushStartedAt from e.oldestBufferedRecord with starting the timer at the moment the flush attempt begins (i.e., set flushStartedAt = time.Now()) so the flush histogram measures only flush work time and not queue residence; keep e.oldestBufferedRecord and bufferOldestRecordAgeSeconds unchanged for reporting queue age. Update the code that currently references flushStartedAt (search for the variable name "flushStartedAt" and its use in the flush histogram emission) to use the new start point. Add a regression test (delayed-buffer test) that simulates a buffered record with non‑zero age and asserts that the flush latency histogram excludes the queue residence time while bufferOldestRecordAgeSeconds still reports the record age. Ensure the test fails with the old behavior and passes after this change.
🧹 Nitpick comments (2)
exporter/awss3exporter/s3_writer.go (1)
166-176: Replaceinterface{}withany.Per Go 1.18+ convention and the linter finding, use
anyinstead ofinterface{}.♻️ Proposed fix
-func applyServerSideEncryption(parameters interface{}, value string) { +func applyServerSideEncryption(parameters any, value string) {🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@exporter/awss3exporter/s3_writer.go` around lines 166 - 176, The function applyServerSideEncryption currently uses the old empty interface type (interface{}) for its parameters; update its signature to use the Go 1.18+ alias any instead (i.e., change parameters interface{} to parameters any) while leaving the body intact so the type assertions against *s3.PutObjectInput and *s3.CreateMultipartUploadInput and assignment to input.ServerSideEncryption (using s3types.ServerSideEncryption) continue to work as before.exporter/awss3exporter/s3_writer_test.go (1)
110-110: Useatomic.Int32for cleaner atomic operations.The static analysis correctly flags this. Since Go 1.19, prefer the type-safe
atomic.Int32overint32withatomic.AddInt32/atomic.LoadInt32.♻️ Proposed fix
- var stsCalls int32 + var stsCalls atomic.Int32Then update the usages:
- atomic.AddInt32(&stsCalls, 1) + stsCalls.Add(1)- assert.Equal(t, int32(1), atomic.LoadInt32(&stsCalls)) + assert.Equal(t, int32(1), stsCalls.Load())🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@exporter/awss3exporter/s3_writer_test.go` at line 110, Replace the plain int32 counter named stsCalls with the type-safe atomic.Int32 and update all call sites: change the declaration var stsCalls int32 to var stsCalls atomic.Int32 (ensure sync/atomic is imported), replace atomic.AddInt32(&stsCalls, 1) with stsCalls.Add(1) and atomic.LoadInt32(&stsCalls) with stsCalls.Load(), and similarly switch any other atomic.*Int32 helper uses to the atomic.Int32 method equivalents in the tests referencing stsCalls.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@extension/parquetlogencodingextension/extension.go`:
- Around line 196-203: The loop currently appends every record without checking
MaxFileSizeBytes, allowing a single large ConvertToParquet result to overshoot;
change the for loop that calls e.Write(record) to incrementally evaluate the
buffered size after each write (or before appending the next record) and trigger
e.checkAndFlushWithMetadata(true, flushReasonSize) (or the appropriate flush
path) when the buffer would exceed MaxFileSizeBytes so you never exceed the
threshold; ensure you still call e.recordBufferStateLocked() appropriately
around the buffer mutation/size check. Also add a multi-record regression test
that constructs one large ConvertToParquet output followed by small records to
assert the large record causes an immediate flush and the subsequent records are
buffered into a fresh file (validate flush count and resulting blob sizes).
- Around line 258-263: The current code calls e.reinitializeWriterFn() after
successfully obtaining pqBytes and returns an error on failure, which drops a
valid payload; change the flow so that when readWriterBytes (the code path that
produces pqBytes) succeeds but e.reinitializeWriterFn() fails, the function
returns the pqBytes payload and its content-type/metadata (instead of an error)
and defers writer repair to the next write/flush attempt (i.e., record the
failure via e.telemetry.recordFailedFlush and log via e.logger.Error but do not
return the error that discards pqBytes). Update the logic around the call site
that currently does "if err := e.reinitializeWriterFn(); err != nil { ... return
nil, \"\", time.Time{}, err }" to return the payload and a sentinel/zero time as
appropriate so the caller can still upload, and add a regression test covering
the readWriterBytes-succeeds + reinitializeWriterFn-fails branch to assert the
payload is returned and reinit is retried later.
---
Outside diff comments:
In `@exporter/awss3exporter/s3_writer.go`:
- Around line 75-79: The duplicitous configuration sets BaseEndpoint twice when
conf.S3Uploader.Endpoint is non-empty; remove one of the identical blocks that
append a func to s3Opts which assigns o.BaseEndpoint =
aws.String(conf.S3Uploader.Endpoint) (the duplicated s3.Options modifier around
conf.S3Uploader.Endpoint) so BaseEndpoint is only set once—keep a single append
to s3Opts using that endpoint and delete the other redundant block.
---
Duplicate comments:
In `@exporter/awss3exporter/internal/upload/partition.go`:
- Around line 68-83: The Build method currently returns an empty string on
template parse or execution failures which leads to rejected S3 uploads; modify
PartitionKeyBuilder.Build so that when
parseLegacyTemplate(pki.LegacyS3KeyTemplate) returns an error or when
buildLegacyTemplateKey produces an empty result, you log the error and fall back
to the standard key path computed by path.Join(pki.bucketKeyPrefix(ts,
overridePrefix), pki.fileName()) instead of returning "". Update the error
handling around parseLegacyTemplate and the call to buildLegacyTemplateKey (and
any callers that propagate an empty string) so they return the fallbackKey on
error or empty output, and include contextual logging with
pki.legacyTemplatePrefix(overridePrefix) and the template/error details.
In `@extension/parquetlogencodingextension/extension.go`:
- Around line 223-226: Replace seeding flushStartedAt from
e.oldestBufferedRecord with starting the timer at the moment the flush attempt
begins (i.e., set flushStartedAt = time.Now()) so the flush histogram measures
only flush work time and not queue residence; keep e.oldestBufferedRecord and
bufferOldestRecordAgeSeconds unchanged for reporting queue age. Update the code
that currently references flushStartedAt (search for the variable name
"flushStartedAt" and its use in the flush histogram emission) to use the new
start point. Add a regression test (delayed-buffer test) that simulates a
buffered record with non‑zero age and asserts that the flush latency histogram
excludes the queue residence time while bufferOldestRecordAgeSeconds still
reports the record age. Ensure the test fails with the old behavior and passes
after this change.
---
Nitpick comments:
In `@exporter/awss3exporter/s3_writer_test.go`:
- Line 110: Replace the plain int32 counter named stsCalls with the type-safe
atomic.Int32 and update all call sites: change the declaration var stsCalls
int32 to var stsCalls atomic.Int32 (ensure sync/atomic is imported), replace
atomic.AddInt32(&stsCalls, 1) with stsCalls.Add(1) and
atomic.LoadInt32(&stsCalls) with stsCalls.Load(), and similarly switch any other
atomic.*Int32 helper uses to the atomic.Int32 method equivalents in the tests
referencing stsCalls.
In `@exporter/awss3exporter/s3_writer.go`:
- Around line 166-176: The function applyServerSideEncryption currently uses the
old empty interface type (interface{}) for its parameters; update its signature
to use the Go 1.18+ alias any instead (i.e., change parameters interface{} to
parameters any) while leaving the body intact so the type assertions against
*s3.PutObjectInput and *s3.CreateMultipartUploadInput and assignment to
input.ServerSideEncryption (using s3types.ServerSideEncryption) continue to work
as before.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: c4bee4df-eee8-4c06-a135-8c62dfd9cc18
📒 Files selected for processing (7)
exporter/awss3exporter/config.goexporter/awss3exporter/config_test.goexporter/awss3exporter/internal/upload/partition.goexporter/awss3exporter/internal/upload/partition_test.goexporter/awss3exporter/s3_writer.goexporter/awss3exporter/s3_writer_test.goextension/parquetlogencodingextension/extension.go
🚧 Files skipped from review as they are similar to previous changes (2)
- exporter/awss3exporter/internal/upload/partition_test.go
- exporter/awss3exporter/config.go
There was a problem hiding this comment.
[ARCH-REVIEW] Re-review: no new architectural/correctness findings on follow-up delta (architecture_context=limited).
Validated follow-up changes in awss3exporter/parquetlogencodingextension/loadbalancingexporter; touched package tests pass locally.
Would approve, but 4 unresolved review thread(s) remain.
There was a problem hiding this comment.
[ARCH-REVIEW] Re-review: no new architectural/correctness findings on follow-up delta (architecture_context=limited).
Validated follow-up changes in awss3exporter/parquetlogencodingextension/loadbalancingexporter; touched package tests pass locally.
Would approve, but 4 unresolved review thread(s) remain.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@exporter/awss3exporter/s3_writer.go`:
- Around line 40-48: In newUploadManager, add a guard that detects partially
provided static credentials (conf.S3Uploader.AccessKeyID set XOR
conf.S3Uploader.SecretAccessKey set) and fail fast instead of silently falling
back to the default chain; locate the block that appends
config.WithCredentialsProvider using credentials.NewStaticCredentialsProvider
and, if exactly one of AccessKeyID or SecretAccessKey is non-empty, return an
explicit error (or propagate a failure) describing the incomplete S3 static
credentials so callers cannot continue with unintended auth behavior.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: d2c87049-b09d-4e38-af8f-85bd32988662
📒 Files selected for processing (6)
exporter/awss3exporter/exporter.goexporter/awss3exporter/exporter_test.goexporter/awss3exporter/s3_writer.goexporter/awss3exporter/s3_writer_test.goexporter/loadbalancingexporter/factory_test.goexporter/loadbalancingexporter/log_exporter_test.go
💤 Files with no reviewable changes (1)
- exporter/loadbalancingexporter/log_exporter_test.go
✅ Files skipped from review due to trivial changes (3)
- exporter/loadbalancingexporter/factory_test.go
- exporter/awss3exporter/exporter_test.go
- exporter/awss3exporter/s3_writer_test.go
🚧 Files skipped from review as they are similar to previous changes (1)
- exporter/awss3exporter/exporter.go
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
Reviewed by Cursor Bugbot for commit 1891f62. Configure here.
There was a problem hiding this comment.
1 issue found across 14 files (changes from recent commits).
Prompt for AI agents (unresolved issues)
Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.
<file name="extension/parquetlogencodingextension/extension.go">
<violation number="1" location="extension/parquetlogencodingextension/extension.go:218">
P2: After an in-loop size flush, later buffered records can keep `oldestBufferedRecord` at zero, causing incorrect oldest-record-age telemetry for non-empty buffers.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review, or fix all with cubic.
There was a problem hiding this comment.
[ARCH-REVIEW] Re-review complete: all previously pending thread-gating concerns are resolved, and the follow-up commits are clean.
Scope reviewed: awss3 legacy template pre-parse/validation, parquet extension flush/recovery hardening, module alignment updates, and related test changes.
Verdict: ✅ APPROVE — no blocking/concern/nit findings in the new changes; unresolved threads = 0.

awss3exporter: add legacy awss3 and parquet compatibility
Ports the Sawmills awss3/parquet compatibility layer into the contrib fork while keeping one contrib runtime path underneath.
Description
Test Plan
go test ./...inexporter/awss3exportergo test ./...inextension/parquetlogencodingextensionNote
High Risk
Introduces a new parquet-encoding extension and changes S3 export keying/auth/SSE behavior plus exporter flush/upload telemetry; mistakes could affect log delivery, S3 object layout, or credentials/encryption usage.
Overview
awss3exporternow accepts legacy Sawmills-generated configuration by addings3_partition,access_key_id/secret_access_key,server_side_encryption, ands3_key_template, normalizings3_partitionintos3_partition_format, and validating legacy values/templates.S3 upload behavior is extended to support static AWS credentials, applying SSE headers on uploads, and generating object keys via a legacy Go template path (
.Prefix,.Date,.UUID) with pre-parsed templates.Parquet support is added via a new
parquet_log_encodingextension, which buffers Datadog-shaped OTLP logs and flushes parquet payloads with flush metadata (reason/timestamp) and its own buffer/flush telemetry.Exporter telemetry and flush bridging are expanded: the exporter records flush and upload counters/histograms (including flush-to-upload handoff time) and can consume flush metadata from marshalers; tests and fixtures are added for legacy configs, key templates, SSE/credential behavior, and telemetry. A small change also removes an in-memory queue encoding option from
loadbalancingexporteroptions wiring.Reviewed by Cursor Bugbot for commit b54cf25. Bugbot is set up for automated code reviews on this repo. Configure here.
Summary by cubic
Adds legacy
awss3and parquet compatibility toawss3exporter, supporting legacy config and key templates plus Datadog JSON/parquet encodings with flush metadata and lifecycle telemetry. Also adds theparquet_log_encodingextension and fixes oldest buffered record age telemetry.New Features
awss3exporters3_partition,s3_key_template,access_key_id,secret_access_key,server_side_encryption; normalizess3_partitiontos3_partition_format..Prefix,.Date,.UUID) with base prefix and precompiled templates; supports static creds and SSE headers on upload.parquet_log_encodingextensionBug Fixes
parquet_log_encodingtelemetry.loadbalancingexporter; aligned collector module versions; disabled AWS EC2 metadata lookups in tests.Written for commit b54cf25. Summary will update on new commits.
Summary by CodeRabbit
New Features
Tests
Documentation
Chores