Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/42643.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'enhancement'

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: 'azureblobexporter'

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Added template_enabled option to azureblobexporter blob_name_format to enable or disable template rendering."

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [42643]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
28 changes: 27 additions & 1 deletion exporter/azureblobexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ The following settings can be optionally configured and have default values:
- metrics (default `metrics`): container to store metrics. default value is `metrics`.
- logs (default `logs`): container to store logs. default value is `logs`.
- traces (default `traces`): container to store traces. default value is `traces`.
- blob_name_format: the final blob name will be blob_name +
- blob_name_format: the final blob name will be blob_name
- template_enabled (default `false`): enables Go template parsing for blob name formats. If parsing fails, it will not throw an error but will log a warning and continue formatting the blob name using other rules.
- metrics_format (default `2006/01/02/metrics_15_04_05.json`): blob name format. The date format follows constants in Golang, refer [here](https://go.dev/src/time/format.go).
- logs_format (default `2006/01/02/logs_15_04_05.json`): blob name format.
- traces_format (default `2006/01/02/traces_15_04_05.json`): blob name format.
Expand All @@ -53,6 +54,24 @@ The following settings can be optionally configured and have default values:
- `max_interval` (default = 30s): Is the upper bound on backoff; ignored if `enabled` is `false`
- `max_elapsed_time` (default = 120s): Is the maximum amount of time spent trying to send a batch; ignored if `enabled` is `false`

### Blob Name Templates

When `template_enabled` is `true`, you can use Go templates in `metrics_format`, `logs_format`, and `traces_format` to create dynamic blob names based on telemetry data. The root object for the template is the telemetry data itself (`pmetric.Metrics`, `plog.Logs`, or `ptrace.Traces`).

The following template functions are available:

| Function | Description | Example |
| ----------------------- | --------------------------------------------------------------------------- | -------------------------------------------------------------------- |
| `getResourceMetricAttr` | Gets a resource attribute from metrics data. | `{{ getResourceMetricAttr . 0 "service.name" }}` |
| `getResourceLogAttr` | Gets a resource attribute from logs data. | `{{ getResourceLogAttr . 0 "service.name" }}` |
| `getResourceSpanAttr` | Gets a resource attribute from traces data. | `{{ getResourceSpanAttr . 0 "service.name" }}` |
| `getScopeMetricAttr` | Gets a scope attribute from metrics data. | `{{ getScopeMetricAttr . 0 0 "scope.name" }}` |
| `getScopeLogAttr` | Gets a scope attribute from logs data. | `{{ getScopeLogAttr . 0 0 "scope.name" }}` |
| `getScopeSpanAttr` | Gets a scope attribute from traces data. | `{{ getScopeSpanAttr . 0 0 "scope.name" }}` |
| `getMetric` | Gets a metric object. You can chain to access its fields. | `{{ (getMetric . 0 0 0).Name }}` |
| `getLogRecord` | Gets a log record object. You can chain to access its fields. | `{{ (getLogRecord . 0 0 0).TraceID }}` |
| `getSpan` | Gets a span object. You can chain to access its fields. | `{{ (getSpan . 0 0 0).Name }}` |

An example configuration is provided as follows:

```yaml
Expand All @@ -71,6 +90,11 @@ exporter:
logs: "logs"
metrics: "metrics"
traces: "traces"
blob_name_format:
template_enabled: true
metrics_format: `{{ getResourceMetricAttr . 0 "service.name" }}/2006/01/02/metrics.json`
logs_format: `{{ getScopeLogAttr . 0 0 "scope.name" }}/2006/01/02/logs.json`
traces_format: `{{ (getSpan . 0 0 0).Name }}/2006/01/02/traces.json`
auth:
type: "connection_string"
connection_string: "DefaultEndpointsProtocol=https;AccountName=<your-acount>;AccountKey=<account-key>;EndpointSuffix=core.windows.net"
Expand All @@ -81,6 +105,8 @@ exporter:
separator: "\n"
```

### Append Blob

When `append_blob` is enabled:
- The exporter will create append blobs instead of block blobs
- New data will be appended to existing blobs rather than creating new ones
Expand Down
1 change: 1 addition & 0 deletions exporter/azureblobexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type BlobNameFormat struct {
TracesFormat string `mapstructure:"traces_format"`
SerialNumRange int64 `mapstructure:"serial_num_range"`
SerialNumBeforeExtension bool `mapstructure:"serial_num_before_extension"`
TemplateEnabled bool `mapstructure:"template_enabled"`
Params map[string]string `mapstructure:"params"`
}

Expand Down
176 changes: 162 additions & 14 deletions exporter/azureblobexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"math/rand/v2"
"path/filepath"
"strings"
"text/template"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/azcore"
Expand All @@ -20,6 +21,7 @@ import (
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/appendblob"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
Expand All @@ -28,11 +30,115 @@ import (
)

type azureBlobExporter struct {
config *Config
logger *zap.Logger
client azblobClient
signal pipeline.Signal
marshaller *marshaller
config *Config
logger *zap.Logger
client azblobClient
signal pipeline.Signal
marshaller *marshaller
blobNameTemplate *blobNameTemplate
}

type blobNameTemplate struct {
metrics *template.Template
logs *template.Template
traces *template.Template
}

func getAttrStandalone(attrs pcommon.Map, key string) any {
if val, ok := attrs.Get(key); ok {
return val.AsRaw()
}
return nil
}

var tempFuncs = template.FuncMap{
"getResourceSpanAttr": func(traces ptrace.Traces, rmIndex int, key string) any {
if traces.ResourceSpans().Len() > 0 {
rs := traces.ResourceSpans().At(rmIndex)
return getAttrStandalone(rs.Resource().Attributes(), key)
}
return nil
},
"getResourceMetricAttr": func(metrics pmetric.Metrics, rmIndex int, key string) any {
if metrics.ResourceMetrics().Len() > 0 {
rm := metrics.ResourceMetrics().At(rmIndex)
return getAttrStandalone(rm.Resource().Attributes(), key)
}
return nil
},
"getResourceLogAttr": func(logs plog.Logs, rlIndex int, key string) any {
if logs.ResourceLogs().Len() > 0 {
rl := logs.ResourceLogs().At(rlIndex)
return getAttrStandalone(rl.Resource().Attributes(), key)
}
return nil
},
"getScopeSpanAttr": func(traces ptrace.Traces, rmIndex, ilsIndex int, key string) any {
if traces.ResourceSpans().Len() > 0 {
rs := traces.ResourceSpans().At(rmIndex)
if rs.ScopeSpans().Len() > 0 {
ils := rs.ScopeSpans().At(ilsIndex)
return getAttrStandalone(ils.Scope().Attributes(), key)
}
}
return nil
},
"getScopeMetricAttr": func(metrics pmetric.Metrics, rmIndex, ilsIndex int, key string) any {
if metrics.ResourceMetrics().Len() > 0 {
rm := metrics.ResourceMetrics().At(rmIndex)
if rm.ScopeMetrics().Len() > 0 {
ils := rm.ScopeMetrics().At(ilsIndex)
return getAttrStandalone(ils.Scope().Attributes(), key)
}
}
return nil
},
"getScopeLogAttr": func(logs plog.Logs, rlIndex, ilsIndex int, key string) any {
if logs.ResourceLogs().Len() > 0 {
rl := logs.ResourceLogs().At(rlIndex)
if rl.ScopeLogs().Len() > 0 {
ils := rl.ScopeLogs().At(ilsIndex)
return getAttrStandalone(ils.Scope().Attributes(), key)
}
}
return nil
},
"getSpan": func(traces ptrace.Traces, rmIndex, ilsIndex, spanIndex int) any {
if traces.ResourceSpans().Len() > 0 {
rs := traces.ResourceSpans().At(rmIndex)
if rs.ScopeSpans().Len() > 0 {
ils := rs.ScopeSpans().At(ilsIndex)
if ils.Spans().Len() > 0 {
return ils.Spans().At(spanIndex)
}
}
}
return ptrace.Span{}
},
"getMetric": func(metrics pmetric.Metrics, rmIndex, ilsIndex, metricIndex int) any {
if metrics.ResourceMetrics().Len() > 0 {
rm := metrics.ResourceMetrics().At(rmIndex)
if rm.ScopeMetrics().Len() > 0 {
ils := rm.ScopeMetrics().At(ilsIndex)
if ils.Metrics().Len() > 0 {
return ils.Metrics().At(metricIndex)
}
}
}
return pmetric.Metric{}
},
"getLogRecord": func(logs plog.Logs, rlIndex, ilsIndex, logIndex int) any {
if logs.ResourceLogs().Len() > 0 {
rl := logs.ResourceLogs().At(rlIndex)
if rl.ScopeLogs().Len() > 0 {
ils := rl.ScopeLogs().At(ilsIndex)
if ils.LogRecords().Len() > 0 {
return ils.LogRecords().At(logIndex)
}
}
}
return plog.LogRecord{}
},
}

type azblobClient interface {
Expand Down Expand Up @@ -81,9 +187,10 @@ func (c *azblobClientImpl) AppendBlock(ctx context.Context, containerName, blobN

func newAzureBlobExporter(config *Config, logger *zap.Logger, signal pipeline.Signal) *azureBlobExporter {
return &azureBlobExporter{
config: config,
logger: logger,
signal: signal,
config: config,
logger: logger,
signal: signal,
blobNameTemplate: &blobNameTemplate{},
}
}

Expand Down Expand Up @@ -159,25 +266,65 @@ func (e *azureBlobExporter) start(_ context.Context, host component.Host) error
default:
return fmt.Errorf("unsupported authentication type: %s", authType)
}

if e.config.BlobNameFormat.TemplateEnabled {
// pre-parse templates to catch error early
e.blobNameTemplate = &blobNameTemplate{}
var err error

e.blobNameTemplate.metrics, err = template.New("metrics").Funcs(tempFuncs).Parse(e.config.BlobNameFormat.MetricsFormat)
if err != nil {
return fmt.Errorf("failed to parse metrics blob name template: %w", err)
}

e.blobNameTemplate.logs, err = template.New("logs").Funcs(tempFuncs).Parse(e.config.BlobNameFormat.LogsFormat)
if err != nil {
return fmt.Errorf("failed to parse logs blob name template: %w", err)
}

e.blobNameTemplate.traces, err = template.New("traces").Funcs(tempFuncs).Parse(e.config.BlobNameFormat.TracesFormat)
if err != nil {
return fmt.Errorf("failed to parse traces blob name template: %w", err)
}
}

return nil
}

func (e *azureBlobExporter) generateBlobName(signal pipeline.Signal) (string, error) {
func (e *azureBlobExporter) generateBlobName(signal pipeline.Signal, telemetryData any) (string, error) {
// Get current time
now := time.Now()

var format string
var tmpl *template.Template
switch signal {
case pipeline.SignalMetrics:
format = e.config.BlobNameFormat.MetricsFormat
tmpl = e.blobNameTemplate.metrics
case pipeline.SignalLogs:
format = e.config.BlobNameFormat.LogsFormat
tmpl = e.blobNameTemplate.logs
case pipeline.SignalTraces:
format = e.config.BlobNameFormat.TracesFormat
tmpl = e.blobNameTemplate.traces
default:
return "", fmt.Errorf("unsupported signal type: %v", signal)
}
var blobName string

// if template enabled, parse and apply template. if met error, fallback to default blob name format
if e.config.BlobNameFormat.TemplateEnabled {
// Parse and apply template with telemetry data
var buf bytes.Buffer
err := tmpl.Execute(&buf, telemetryData)
if err != nil {
e.logger.Warn("Failed to execute blob name template, using default blob name format", zap.Error(err))
} else {
blobName = buf.String()
format = blobName
}
}

if e.config.BlobNameFormat.SerialNumBeforeExtension {
// Append a random number and do so before the file extension if there is one
ext := filepath.Ext(format)
Expand All @@ -188,6 +335,7 @@ func (e *azureBlobExporter) generateBlobName(signal pipeline.Signal) (string, er
// Appends the random number after any potential file extension to minimize performance impact when high throughput
blobName = fmt.Sprintf("%s_%d", now.Format(format), randomInRange(0, int(e.config.BlobNameFormat.SerialNumRange)))
}

return blobName, nil
}

Expand All @@ -202,7 +350,7 @@ func (e *azureBlobExporter) ConsumeMetrics(ctx context.Context, md pmetric.Metri
return fmt.Errorf("failed to marshal metrics: %w", err)
}

return e.consumeData(ctx, data, pipeline.SignalMetrics)
return e.consumeData(ctx, md, data, pipeline.SignalMetrics)
}

func (e *azureBlobExporter) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
Expand All @@ -212,7 +360,7 @@ func (e *azureBlobExporter) ConsumeLogs(ctx context.Context, ld plog.Logs) error
return fmt.Errorf("failed to marshal logs: %w", err)
}

return e.consumeData(ctx, data, pipeline.SignalLogs)
return e.consumeData(ctx, ld, data, pipeline.SignalLogs)
}

func (e *azureBlobExporter) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
Expand All @@ -222,12 +370,12 @@ func (e *azureBlobExporter) ConsumeTraces(ctx context.Context, td ptrace.Traces)
return fmt.Errorf("failed to marshal traces: %w", err)
}

return e.consumeData(ctx, data, pipeline.SignalTraces)
return e.consumeData(ctx, td, data, pipeline.SignalTraces)
}

func (e *azureBlobExporter) consumeData(ctx context.Context, data []byte, signal pipeline.Signal) error {
func (e *azureBlobExporter) consumeData(ctx context.Context, telemetryData any, data []byte, signal pipeline.Signal) error {
// Generate a unique blob name
blobName, err := e.generateBlobName(signal)
blobName, err := e.generateBlobName(signal, telemetryData)
if err != nil {
return fmt.Errorf("failed to generate blobname: %w", err)
}
Expand Down
Loading
Loading