Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .chloggen/otelarrowexporter.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
change_type: new_component

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: OpenTelemetry Protocol with Apache Arrow Exporter
component: exporter/otelarrow

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Implementation copied from opentelemetry/otel-arrow repository @v0.20.0.
note: Implementation copied from opentelemetry/otel-arrow repository @v0.23.0.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [26491]
Expand Down
11 changes: 9 additions & 2 deletions exporter/otelarrowexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,15 @@ The following settings determine the resources that the exporter will use:
- `num_streams` (default: number of CPUs): the number of concurrent Arrow streams
- `max_stream_lifetime` (default: unlimited): duration after which streams are recycled.

When `num_streams` is greater than one, a configurable policy
determines how load is assigned across streams. The supported
policies are `leastloaded`, which picks the stream with the smallest
number of outstanding requests, and `leastloadedN` for `N <=
num_streams`, which limits the decision to a random subset of `N`
streams.

- `prioritizer` (default: "leastloaded"): policy for distributing load across multiple streams.

### Network Configuration

This component uses `round_robin` by default as the gRPC load
Expand Down Expand Up @@ -211,8 +220,6 @@ level](https://arrow.apache.org/docs/format/Columnar.html#format-ipc).

- `payload_compression`: compression applied at the Arrow IPC level, "none" by default, "zstd" supported.

payload_compression: zstd # describes Arrow-IPC compression (default "none")

Compression settings at the Arrow IPC level cannot be further
configured. We do not recommend configuring both payload and
gRPC-level compression at once, hwoever these settings are
Expand Down
36 changes: 18 additions & 18 deletions exporter/otelarrowexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,14 @@ func createDefaultConfig() component.Config {
}
}

func (oce *baseExporter) helperOptions() []exporterhelper.Option {
func (exp *baseExporter) helperOptions() []exporterhelper.Option {
return []exporterhelper.Option{
exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}),
exporterhelper.WithTimeout(oce.config.TimeoutSettings),
exporterhelper.WithRetry(oce.config.RetryConfig),
exporterhelper.WithQueue(oce.config.QueueSettings),
exporterhelper.WithStart(oce.start),
exporterhelper.WithShutdown(oce.shutdown),
exporterhelper.WithTimeout(exp.config.TimeoutSettings),
exporterhelper.WithRetry(exp.config.RetryConfig),
exporterhelper.WithQueue(exp.config.QueueSettings),
exporterhelper.WithStart(exp.start),
exporterhelper.WithShutdown(exp.shutdown),
}
}

Expand All @@ -97,13 +97,13 @@ func createTracesExporter(
set exporter.CreateSettings,
cfg component.Config,
) (exporter.Traces, error) {
oce, err := newExporter(cfg, set, createArrowTracesStream)
exp, err := newExporter(cfg, set, createArrowTracesStream)
if err != nil {
return nil, err
}
return exporterhelper.NewTracesExporter(ctx, oce.settings, oce.config,
oce.pushTraces,
oce.helperOptions()...,
return exporterhelper.NewTracesExporter(ctx, exp.settings, exp.config,
exp.pushTraces,
exp.helperOptions()...,
)
}

Expand All @@ -116,13 +116,13 @@ func createMetricsExporter(
set exporter.CreateSettings,
cfg component.Config,
) (exporter.Metrics, error) {
oce, err := newExporter(cfg, set, createArrowMetricsStream)
exp, err := newExporter(cfg, set, createArrowMetricsStream)
if err != nil {
return nil, err
}
return exporterhelper.NewMetricsExporter(ctx, oce.settings, oce.config,
oce.pushMetrics,
oce.helperOptions()...,
return exporterhelper.NewMetricsExporter(ctx, exp.settings, exp.config,
exp.pushMetrics,
exp.helperOptions()...,
)
}

Expand All @@ -135,12 +135,12 @@ func createLogsExporter(
set exporter.CreateSettings,
cfg component.Config,
) (exporter.Logs, error) {
oce, err := newExporter(cfg, set, createArrowLogsStream)
exp, err := newExporter(cfg, set, createArrowLogsStream)
if err != nil {
return nil, err
}
return exporterhelper.NewLogsExporter(ctx, oce.settings, oce.config,
oce.pushLogs,
oce.helperOptions()...,
return exporterhelper.NewLogsExporter(ctx, exp.settings, exp.config,
exp.pushLogs,
exp.helperOptions()...,
)
}