-
Notifications
You must be signed in to change notification settings - Fork 2.5k
/
Copy pathconfig.go
175 lines (139 loc) · 6.69 KB
/
config.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package kafkaexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter"
import (
"fmt"
"time"
"github.com/IBM/sarama"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configretry"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka"
)
// Config defines configuration for Kafka exporter.
type Config struct {
TimeoutSettings exporterhelper.TimeoutConfig `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.
QueueSettings exporterhelper.QueueConfig `mapstructure:"sending_queue"`
configretry.BackOffConfig `mapstructure:"retry_on_failure"`
// The list of kafka brokers (default localhost:9092)
Brokers []string `mapstructure:"brokers"`
// ResolveCanonicalBootstrapServersOnly makes Sarama do a DNS lookup for
// each of the provided brokers. It will then do a PTR lookup for each
// returned IP, and that set of names becomes the broker list. This can be
// required in SASL environments.
ResolveCanonicalBootstrapServersOnly bool `mapstructure:"resolve_canonical_bootstrap_servers_only"`
// Kafka protocol version
ProtocolVersion string `mapstructure:"protocol_version"`
// ClientID to configure the Kafka client with. This can be leveraged by
// Kafka to enforce ACLs, throttling quotas, and more.
ClientID string `mapstructure:"client_id"`
// The name of the kafka topic to export to (default otlp_spans for traces, otlp_metrics for metrics)
Topic string `mapstructure:"topic"`
// TopicFromAttribute is the name of the attribute to use as the topic name.
TopicFromAttribute string `mapstructure:"topic_from_attribute"`
// Encoding of messages (default "otlp_proto")
Encoding string `mapstructure:"encoding"`
// PartitionTracesByID sets the message key of outgoing trace messages to the trace ID.
// Please note: does not have any effect on Jaeger encoding exporters since Jaeger exporters include
// trace ID as the message key by default.
PartitionTracesByID bool `mapstructure:"partition_traces_by_id"`
PartitionMetricsByResourceAttributes bool `mapstructure:"partition_metrics_by_resource_attributes"`
PartitionLogsByResourceAttributes bool `mapstructure:"partition_logs_by_resource_attributes"`
// Metadata is the namespace for metadata management properties used by the
// Client, and shared by the Producer/Consumer.
Metadata Metadata `mapstructure:"metadata"`
// Producer is the namespaces for producer properties used only by the Producer
Producer Producer `mapstructure:"producer"`
// Authentication defines used authentication mechanism.
Authentication kafka.Authentication `mapstructure:"auth"`
}
// Metadata defines configuration for retrieving metadata from the broker.
type Metadata struct {
// Whether to maintain a full set of metadata for all topics, or just
// the minimal set that has been necessary so far. The full set is simpler
// and usually more convenient, but can take up a substantial amount of
// memory if you have many topics and partitions. Defaults to true.
Full bool `mapstructure:"full"`
// Retry configuration for metadata.
// This configuration is useful to avoid race conditions when broker
// is starting at the same time as collector.
Retry MetadataRetry `mapstructure:"retry"`
}
// Producer defines configuration for producer
type Producer struct {
// Maximum message bytes the producer will accept to produce.
MaxMessageBytes int `mapstructure:"max_message_bytes"`
// RequiredAcks Number of acknowledgements required to assume that a message has been sent.
// https://pkg.go.dev/github.com/IBM/[email protected]#RequiredAcks
// The options are:
// 0 -> NoResponse. doesn't send any response
// 1 -> WaitForLocal. waits for only the local commit to succeed before responding ( default )
// -1 -> WaitForAll. waits for all in-sync replicas to commit before responding.
RequiredAcks sarama.RequiredAcks `mapstructure:"required_acks"`
// Compression Codec used to produce messages
// https://pkg.go.dev/github.com/IBM/[email protected]#CompressionCodec
// The options are: 'none', 'gzip', 'snappy', 'lz4', and 'zstd'
Compression string `mapstructure:"compression"`
// The maximum number of messages the producer will send in a single
// broker request. Defaults to 0 for unlimited. Similar to
// `queue.buffering.max.messages` in the JVM producer.
FlushMaxMessages int `mapstructure:"flush_max_messages"`
}
// MetadataRetry defines retry configuration for Metadata.
type MetadataRetry struct {
// The total number of times to retry a metadata request when the
// cluster is in the middle of a leader election or at startup (default 3).
Max int `mapstructure:"max"`
// How long to wait for leader election to occur before retrying
// (default 250ms). Similar to the JVM's `retry.backoff.ms`.
Backoff time.Duration `mapstructure:"backoff"`
}
var _ component.Config = (*Config)(nil)
// Validate checks if the exporter configuration is valid
func (cfg *Config) Validate() error {
if cfg.Producer.RequiredAcks < -1 || cfg.Producer.RequiredAcks > 1 {
return fmt.Errorf("producer.required_acks has to be between -1 and 1. configured value %v", cfg.Producer.RequiredAcks)
}
_, err := saramaProducerCompressionCodec(cfg.Producer.Compression)
if err != nil {
return err
}
return validateSASLConfig(cfg.Authentication.SASL)
}
func validateSASLConfig(c *kafka.SASLConfig) error {
if c == nil {
return nil
}
if c.Username == "" {
return fmt.Errorf("auth.sasl.username is required")
}
if c.Password == "" {
return fmt.Errorf("auth.sasl.password is required")
}
switch c.Mechanism {
case "PLAIN", "AWS_MSK_IAM", "AWS_MSK_IAM_OAUTHBEARER", "SCRAM-SHA-256", "SCRAM-SHA-512":
// Do nothing, valid mechanism
default:
return fmt.Errorf("auth.sasl.mechanism should be one of 'PLAIN', 'AWS_MSK_IAM', 'AWS_MSK_IAM_OAUTHBEARER', 'SCRAM-SHA-256' or 'SCRAM-SHA-512'. configured value %v", c.Mechanism)
}
if c.Version < 0 || c.Version > 1 {
return fmt.Errorf("auth.sasl.version has to be either 0 or 1. configured value %v", c.Version)
}
return nil
}
func saramaProducerCompressionCodec(compression string) (sarama.CompressionCodec, error) {
switch compression {
case "none":
return sarama.CompressionNone, nil
case "gzip":
return sarama.CompressionGZIP, nil
case "snappy":
return sarama.CompressionSnappy, nil
case "lz4":
return sarama.CompressionLZ4, nil
case "zstd":
return sarama.CompressionZSTD, nil
default:
return sarama.CompressionNone, fmt.Errorf("producer.compression should be one of 'none', 'gzip', 'snappy', 'lz4', or 'zstd'. configured value %v", compression)
}
}