-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Init Elasticsearch exporter #2324
Changes from 9 commits
66f7074
04f1394
98258d5
cd186b1
9044bc2
bf95ba3
5ce6181
5337d63
63575c0
758b2c2
13243f6
b4d147b
617048e
9255024
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
include ../../Makefile.Common |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,82 @@ | ||
# Elasticsearch Exporter | ||
|
||
This exporter supports sending OpenTelemetry logs to [Elasticsearch](https://www.elastic.co/elasticsearch). | ||
|
||
## Configuration options | ||
|
||
- `endpoints`: List of Elasticsearch URLs. If endpoints and cloudid is missing, the | ||
ELASTICSEARCH_URL environment variable will be used. | ||
- `cloudid` (optional): | ||
[ID](https://www.elastic.co/guide/en/cloud/current/ec-cloud-id.html) of the | ||
Elastic Cloud Cluster to publish events to. The `cloudid` can be used instead | ||
of `endpoints`. | ||
- `workers` (optional): Number of workers publishing bulk requests concurrently. | ||
- `index`: The | ||
[index](https://www.elastic.co/guide/en/elasticsearch/reference/current/indices.html) | ||
or [datastream](https://www.elastic.co/guide/en/elasticsearch/reference/current/data-streams.html) | ||
name to publish events to. The default value is `logs-generic-default`. | ||
- `pipeline` (optional): Optional [Ingest Node](https://www.elastic.co/guide/en/elasticsearch/reference/current/ingest.html) | ||
pipeline ID used for processing documents published by the exporter. | ||
- `flush`: Event bulk buffer flush settings | ||
- `bytes` (default=5242880): Write buffer flush limit. | ||
- `interval` (default=30s): Write buffer time limit. | ||
- `retry`: Event retry settings | ||
- `enabled` (default=true): Enable/Disable event retry on error. Retry | ||
support is enabled by default. | ||
- `max` (default=3): Number of HTTP retry attempts. | ||
- `initial_interval` (default=100ms): Initial waiting time if a HTTP request failed. | ||
- `max_interval` (default=1m): Max waiting time if a HTTP request failed. | ||
- `mapping`: Events are encoded to JSON. The `mapping` allows users to | ||
configure additional mapping rules. | ||
- `mode` (default=ecs): The fields naming mode. valid modes are: | ||
- `none`: Use original fields and event structure from the OTLP event. | ||
- `ecs`: Try to map fields defined in the | ||
[OpenTelemetry Semantic Conventions](https://github.com/open-telemetry/opentelemetry-specification/tree/main/semantic_conventions) | ||
to [Elastic Common Schema (ECS)](https://www.elastic.co/guide/en/ecs/current/index.html). | ||
- `fields` (optional): Configure additional fields mappings. | ||
- `file` (optional): Read additional field mappings from the provided YAML file. | ||
- `dedup` (default=true): Try to find and remove duplicate fields/attributes | ||
from events before publishing to Elasticsearch. Some structured logging | ||
libraries can produce duplicate fields (for example zap). Elasticsearch | ||
will reject documents that have duplicate fields. | ||
- `dedot` (default=true): When enabled attributes with `.` will be split into | ||
proper json objects. | ||
|
||
### HTTP settings | ||
|
||
- `read_buffer_size` (default=0): Read buffer size. | ||
- `write_buffer_size` (default=0): Write buffer size used when. | ||
- `timeout` (default=90s): HTTP request time limit. | ||
- `headers` (optional): Headers to be send with each HTTP request. | ||
|
||
### Security and Authentication settings | ||
|
||
- `user` (optional): Username used for HTTP Basic Authentication. | ||
- `password` (optional): Password used for HTTP Basic Authentication. | ||
- `api_key` (optional): Authorization [API Key](https://www.elastic.co/guide/en/elasticsearch/reference/current/security-api-create-api-key.html). | ||
- `ca_file` (optional): Root Certificate Authority (CA) certificate, for | ||
verifying the server's identity, if TLS is enabled. | ||
- `cert_file` (optional): Client TLS certificate. | ||
- `key_file` (optional): Client TLS key. | ||
- `insecure` (optional): Disable verification of the server's identity, if TLS | ||
is enabled. | ||
|
||
### Node Discovery | ||
|
||
The Elasticsearch Exporter will check Elasticsearch regularly for available | ||
nodes and updates the list of hosts if discovery is enabled. Newly discovered | ||
nodes will automatically be used for load balancing. | ||
|
||
- `discover`: | ||
- `on_start` (optional): If enabled the exporter queries Elasticsearch | ||
for all known nodes in the cluster on startup. | ||
- `interval` (optional): Interval to update the list of Elasticsearch nodes. | ||
|
||
## Example | ||
|
||
```yaml | ||
exporters: | ||
elasticsearch: | ||
endpoints: | ||
- "https://localhost:9200" | ||
``` |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,228 @@ | ||
// Copyright 2020, OpenTelemetry Authors | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package elasticsearchexporter | ||
|
||
import ( | ||
"errors" | ||
"fmt" | ||
"os" | ||
"strings" | ||
"time" | ||
|
||
"go.opentelemetry.io/collector/config/configmodels" | ||
"go.opentelemetry.io/collector/config/configtls" | ||
) | ||
|
||
// Config defines configuration for Elastic exporter. | ||
type Config struct { | ||
configmodels.ExporterSettings `mapstructure:",squash"` | ||
|
||
// Endpoints holds the Elasticsearch URLs the exporter should send events to. | ||
// | ||
// This setting is required if CloudID is not set and if the | ||
// ELASTICSEARCH_URL environment variable is not set. | ||
Endpoints []string `mapstructure:"endpoints"` | ||
|
||
// CloudID holds the cloud ID to identify the Elastic Cloud cluster to send events to. | ||
// https://www.elastic.co/guide/en/cloud/current/ec-cloud-id.html | ||
// | ||
// This setting is required if no URL is configured. | ||
CloudID string `mapstructure:"cloudid"` | ||
|
||
// Workers configures the number of workers publishing bulk requests. | ||
Workers int `mapstructure:"workers"` | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider "num_workers" to be consistent with NumConsumers in our sending_queue helper. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1 |
||
|
||
// Index configures the index, index alias, or data stream name events should be indexed in. | ||
// | ||
// https://www.elastic.co/guide/en/elasticsearch/reference/current/indices.html | ||
// https://www.elastic.co/guide/en/elasticsearch/reference/current/data-streams.html | ||
// | ||
// This setting is required. | ||
Index string `mapstructure:"index"` | ||
|
||
// Pipeline configures the ingest node pipeline name that should be used to process the | ||
// events. | ||
// | ||
// https://www.elastic.co/guide/en/elasticsearch/reference/current/ingest.html | ||
Pipeline string `mapstructure:"pipeline"` | ||
|
||
HTTPClientSettings `mapstructure:",squash"` | ||
jpkrohling marked this conversation as resolved.
Show resolved
Hide resolved
|
||
Discovery DiscoverySettings `mapstructure:"discover"` | ||
Retry RetrySettings `mapstructure:"retry"` | ||
jpkrohling marked this conversation as resolved.
Show resolved
Hide resolved
|
||
Flush FlushSettings `mapstructure:"flush"` | ||
Mapping MappingsSettings `mapstructure:"mapping"` | ||
} | ||
|
||
type HTTPClientSettings struct { | ||
Authentication AuthenticationSettings `mapstructure:",squash"` | ||
|
||
// ReadBufferSize for HTTP client. See http.Transport.ReadBufferSize. | ||
ReadBufferSize int `mapstructure:"read_buffer_size"` | ||
|
||
// WriteBufferSize for HTTP client. See http.Transport.WriteBufferSize. | ||
WriteBufferSize int `mapstructure:"write_buffer_size"` | ||
|
||
// Timeout configures the HTTP request timeout. | ||
Timeout time.Duration `mapstructure:"timeout"` | ||
|
||
// Headers allows users to configure optional HTTP headers that | ||
// will be send with each HTTP request. | ||
Headers map[string]string `mapstructure:"headers,omitempty"` | ||
|
||
configtls.TLSClientSetting `mapstructure:",squash"` | ||
} | ||
|
||
// AuthenticationSettings defines user authentication related settings. | ||
type AuthenticationSettings struct { | ||
// User is used to configure HTTP Basic Authentication. | ||
User string `mapstructure:"user"` | ||
|
||
// Password is used to configure HTTP Basic Authentication. | ||
Password string `mapstructure:"password"` | ||
|
||
// APIKey is used to configure ApiKey based Authentication. | ||
// | ||
// https://www.elastic.co/guide/en/elasticsearch/reference/current/security-api-create-api-key.html | ||
APIKey string `mapstructure:"api_key"` | ||
} | ||
|
||
// DiscoverySettings defines Elasticsearch node discovery related settings. | ||
// The exporter will check Elasticsearch regularly for available nodes | ||
// and updates the list of hosts if discovery is enabled. Newly discovered | ||
// nodes will automatically be used for load balancing. | ||
// | ||
// DiscoverySettings should not be enabled when operating Elasticsearch behind a proxy | ||
// or load balancer. | ||
// | ||
// https://www.elastic.co/blog/elasticsearch-sniffing-best-practices-what-when-why-how | ||
type DiscoverySettings struct { | ||
// OnStart, if set, instructs the exporter to look for available Elasticsearch | ||
// nodes the first time the exporter connects to the cluster. | ||
OnStart bool `mapstructure:"on_start"` | ||
|
||
// Interval instructs the exporter to renew the list of Elasticsearch URLs | ||
// with the given interval. URLs will not be updated if Interval is <=0. | ||
Interval time.Duration `mapstructure:"interval"` | ||
} | ||
|
||
// FlushSettings defines settings for configuring the write buffer flushing | ||
// policy in the Elasticsearch exporter. The exporter sends a bulk request with | ||
// all events already serialized into the send-buffer. | ||
type FlushSettings struct { | ||
// Bytes sets the send buffer flushing limit. | ||
Bytes int `mapstructure:"bytes"` | ||
|
||
// Interval configures the max age of a document in the send buffer. | ||
Interval time.Duration `mapstructure:"interval"` | ||
} | ||
|
||
// RetrySettings defines settings for the HTTP request retries in the Elasticsearch exporter. | ||
// Failed sends are retried with exponential backoff. | ||
type RetrySettings struct { | ||
// Enabled allows users to disable retry without having to comment out all settings. | ||
Enabled bool `mapstructure:"enabled"` | ||
|
||
// Max configures how often an HTTP request is retried before it is assumed to be failed. | ||
Max int `mapstructure:"max"` | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1 Updated field name and configuration name to |
||
|
||
// InitialInterval configures the initial waiting time if a request failed. | ||
InitialInterval time.Duration `mapstructure:"initial_interval"` | ||
|
||
// MaxInterval configures the max waiting time if consecutive requests failed. | ||
MaxInterval time.Duration `mapstructure:"max_interval"` | ||
} | ||
|
||
type MappingsSettings struct { | ||
// Mode configures the field mappings. | ||
Mode string `mapstructure:"mode"` | ||
|
||
// Additional field mappings. | ||
Fields map[string]string `mapstructure:"fields"` | ||
|
||
// File to read additional fields mappings from. | ||
File string `mapstructure:"file"` | ||
|
||
// Try to find and remove duplicate fields | ||
Dedup bool `mapstructure:"dedup"` | ||
|
||
Dedot bool `mapstructure:"dedot"` | ||
} | ||
|
||
type MappingMode int | ||
|
||
const ( | ||
MappingNone MappingMode = iota | ||
MappingECS | ||
) | ||
|
||
var ( | ||
errConfigNoEndpoint = errors.New("endpoints or cloudid must be specified") | ||
errConfigEmptyEndpoint = errors.New("endpoints must not include empty entries") | ||
errConfigNoIndex = errors.New("index must be specified") | ||
) | ||
|
||
func (m MappingMode) String() string { | ||
switch m { | ||
case MappingNone: | ||
return "" | ||
case MappingECS: | ||
return "ecs" | ||
default: | ||
return "" | ||
} | ||
} | ||
|
||
var mappingModes = func() map[string]MappingMode { | ||
table := map[string]MappingMode{} | ||
for _, m := range []MappingMode{ | ||
MappingNone, | ||
MappingECS, | ||
} { | ||
table[strings.ToLower(m.String())] = m | ||
} | ||
|
||
// config aliases | ||
table["no"] = MappingNone | ||
table["none"] = MappingNone | ||
|
||
return table | ||
}() | ||
|
||
const defaultElasticsearchEnvName = "ELASTICSEARCH_URL" | ||
|
||
// Validate validates the elasticsearch server configuration. | ||
func (cfg *Config) Validate() error { | ||
if len(cfg.Endpoints) == 0 && cfg.CloudID == "" { | ||
if os.Getenv(defaultElasticsearchEnvName) == "" { | ||
return errConfigNoEndpoint | ||
} | ||
} | ||
|
||
for _, endpoint := range cfg.Endpoints { | ||
if endpoint == "" { | ||
return errConfigEmptyEndpoint | ||
} | ||
} | ||
|
||
if cfg.Index == "" { | ||
return errConfigNoIndex | ||
} | ||
|
||
if _, ok := mappingModes[cfg.Mapping.Mode]; !ok { | ||
return fmt.Errorf("unknown mapping mode %v", cfg.Mapping.Mode) | ||
} | ||
|
||
return nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How does a deployment typically look like? Is it common to have TLS for one server, but not for another? Do they have different sets of certs, or would a client only need one CA that works for all clients? I'm asking because endpoints are typically only
host:port
, without the protocol part, using TLS by default.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, when using TLS one would want to have TLS for the complete cluster.
Ideally one CA cert.
The exporter also accepts
localhost:9200
, but I'm not sure if TLS will be automatically enabled or disabled by default. I assume the client library will expand this tohttp://localhost:9200
if tls is not explicitely configured.I took otlphttpexporter as example, which also uses a full URL in its example:
same for zipkin.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bogdandrutu ^
Interesting, the one from OTLP should have been named URL then, instead of Endpoint. Looks good to me then!