Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ processor/datadogsemanticsprocessor/ @open-telemetry
processor/deltatocumulativeprocessor/ @open-telemetry/collector-contrib-approvers @sh0rez @RichieSams
processor/deltatorateprocessor/ @open-telemetry/collector-contrib-approvers @Aneurysm9
processor/dnslookupprocessor/ @open-telemetry/collector-contrib-approvers @andrzej-stencel @kaisecheng @edmocosta
processor/enrichmentprocessor/ @open-telemetry/collector-contrib-approvers @sokoide @kyo-ke @sandy2008
processor/filterprocessor/ @open-telemetry/collector-contrib-approvers @TylerHelmuth @boostchicken
processor/geoipprocessor/ @open-telemetry/collector-contrib-approvers @andrzej-stencel @michalpristas @rogercoll
processor/groupbyattrsprocessor/ @open-telemetry/collector-contrib-approvers @rnishtala-sumo @echlebek @amdprophet
Expand Down
1 change: 1 addition & 0 deletions .github/ISSUE_TEMPLATE/bug_report.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ body:
- processor/deltatocumulative
- processor/deltatorate
- processor/dnslookup
- processor/enrichment
- processor/filter
- processor/geoip
- processor/groupbyattrs
Expand Down
1 change: 1 addition & 0 deletions .github/ISSUE_TEMPLATE/feature_request.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ body:
- processor/deltatocumulative
- processor/deltatorate
- processor/dnslookup
- processor/enrichment
- processor/filter
- processor/geoip
- processor/groupbyattrs
Expand Down
1 change: 1 addition & 0 deletions .github/ISSUE_TEMPLATE/other.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ body:
- processor/deltatocumulative
- processor/deltatorate
- processor/dnslookup
- processor/enrichment
- processor/filter
- processor/geoip
- processor/groupbyattrs
Expand Down
1 change: 1 addition & 0 deletions .github/ISSUE_TEMPLATE/unmaintained.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ body:
- processor/deltatocumulative
- processor/deltatorate
- processor/dnslookup
- processor/enrichment
- processor/filter
- processor/geoip
- processor/groupbyattrs
Expand Down
1 change: 1 addition & 0 deletions processor/enrichmentprocessor/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include ../../Makefile.Common
141 changes: 141 additions & 0 deletions processor/enrichmentprocessor/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
# Enrichment Processor

<!-- status autogenerated section -->
| Status | |
| ------------- |-----------|
| Stability | [alpha]: traces, metrics, logs |
| Distributions | [contrib] |
| Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Aprocessor%2Fenrichment%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Aprocessor%2Fenrichment) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Aprocessor%2Fenrichment%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Aprocessor%2Fenrichment) |
| Code coverage | [![codecov](https://codecov.io/github/open-telemetry/opentelemetry-collector-contrib/graph/main/badge.svg?component=processor_enrichment)](https://app.codecov.io/gh/open-telemetry/opentelemetry-collector-contrib/tree/main/?components%5B0%5D=processor_enrichment&displayType=list) |
| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@sokoide](https://www.github.com/sokoide), [@kyo-ke](https://www.github.com/kyo-ke), [@sandy2008](https://www.github.com/sandy2008) |

[alpha]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/component-stability.md#alpha
[contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib
<!-- end autogenerated section -->

The enrichment processor adds attributes to telemetry by looking up external metadata. It supports enriching:
- Resource attributes (context: "resource")
- Individual span/log/metric data point attributes (context: "individual")

Metadata can be provided via:
- File
- HTTP

Both HTTP and file sources are periodically refreshed.

Accepted data format for metadata is CSV/JSON.

CSV requires a header row and each row should contain same number of value.
JSON should be simple list of key value pair like below:

```json
[
{
"service_name": "user-service",
"owner_team": "platform-team",
"environment": "production",
"region": "us-east-1"
},
{
"service_name": "payment-service",
"owner_team": "payments-team",
"environment": "production",
"region": "us-west-2"
}
]
```

## Configuration

```yaml
processors:
enrichment:
data_sources:
- name: service_inventory
type: http
http:
url: https://example.com/api/services
headers: { }
timeout: 30s
refresh_interval: 5m
- name: host_metadata
type: file
file:
path: /etc/collector/hosts.csv
format: csv # json|csv
refresh_interval: 1m
enrichment_rules:
# Enrich resource attributes using host metadata
- name: enrich_resource_from_host
context: resource
data_source: host_metadata
lookup_attributekey: host.name # attribute in telemetry
lookup_field: hostname # field in data source
mappings:
- source_field: region
target_attribute: cloud.region
- source_field: environment
target_attribute: deployment.environment
# Enrich spans/logs/metrics with service metadata
- name: enrich_individual_from_service
context: individual
data_source: service_inventory
lookup_attributekey: service.name
lookup_field: service_name
mappings:
- source_field: owner_team
target_attribute: team.name
- source_field: environment
target_attribute: deployment.environment
```

## Configuration options

- processors.enrichment
- data_sources: List of external sources to read enrichment data from.
- name: Unique identifier for this source. Required.
- type: "http" or "file". Required.
- http: Required when type=http.
- url: Endpoint returning an array of JSON objects. Required.
- headers: Optional map of request headers.
- timeout: Request timeout (e.g., 30s, 1m). Optional.
- refresh_interval: How often to refetch data (e.g., 5m). Required for periodic refresh.
- json_path: Optional; currently not used. The response must already be an array of objects.
- file: Required when type=file.
- path: Path to the file. Required.
- format: "json" or "csv". Required.
- refresh_interval: How often to reload the file (e.g., 1m). Required for periodic refresh.
- enrichment_rules: List of rules describing how to map source fields to attributes.
- name: Unique rule name. Required.
- context: Where to apply the rule. One of "resource" or "individual". Required.
- data_source: Name of a configured data source to query. Required.
- lookup_attributekey: Attribute in telemetry used as the lookup key (e.g., service.name). Required.
- lookup_field: Field name in the data source to match against (e.g., service_name). Required.
- mappings: List of field mappings to apply. At least one required.
- source_field: Field name in the data source row. Required.
- target_attribute: Attribute to set on the telemetry. Required.

Behavior:
- If multiple rows share the same lookup_field value, the last one loaded wins.
- Empty values in the source row are skipped (no attribute is set).
- Each source is refreshed on its refresh_interval.
- Rule errors are logged; processing continues with other rules.

## Using in a pipeline

```yaml
service:
pipelines:
traces:
processors: [enrichment]
metrics:
processors: [enrichment]
logs:
processors: [enrichment]
```

## Notes
- Context must be one of: resource, individual.
- HTTP and file sources are periodically refreshed using refresh_interval.
- JSON files/endpoints must return an array of objects; CSV requires a header row.
- When multiple rows share the same lookup_field value, the last one read wins.
205 changes: 205 additions & 0 deletions processor/enrichmentprocessor/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package enrichmentprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/enrichmentprocessor"

import (
"errors"
"fmt"
"time"
)

const (
// ENRICHCONTEXTRESOURCE indicates that the enrichment rule applies to resource attributes
ENRICHCONTEXTRESOURCE = "resource"
// ENRICHCONTEXTINDIVIDUAL indicates that the enrichment rule applies to log/metric/span attributes
ENRICHCONTEXTINDIVIDUAL = "individual"
)

// Config defines the configuration for the enrichment processor.
type Config struct {
// DataSources defines the external data sources for enrichment
DataSources []DataSourceConfig `mapstructure:"data_sources"`

// EnrichmentRules defines how to enrich telemetry data
EnrichmentRules []EnrichmentRule `mapstructure:"enrichment_rules"`
}

// DataSourceConfig defines configuration for an external data source
type DataSourceConfig struct {
// Name is a unique identifier for this data source
Name string `mapstructure:"name"`

// Type specifies the data source type (http, file)
Type string `mapstructure:"type"`

// HTTP configuration (used when Type is "http")
HTTP *HTTPDataSourceConfig `mapstructure:"http,omitempty"`

// File configuration (used when Type is "file")
File *FileDataSourceConfig `mapstructure:"file,omitempty"`
}

// HTTPDataSourceConfig defines configuration for HTTP-based data sources
type HTTPDataSourceConfig struct {
// URL is the HTTP endpoint URL
URL string `mapstructure:"url"`

// Headers to include in the request
Headers map[string]string `mapstructure:"headers"`

// Timeout for HTTP requests
Timeout time.Duration `mapstructure:"timeout"`

// RefreshInterval specifies how often to refresh the data
RefreshInterval time.Duration `mapstructure:"refresh_interval"`
}

// FileDataSourceConfig defines configuration for file-based data sources
type FileDataSourceConfig struct {
// Path to the file
Path string `mapstructure:"path"`

// Format of the file (json, csv, yaml)
Format string `mapstructure:"format"`

// RefreshInterval specifies how often to check for file changes
RefreshInterval time.Duration `mapstructure:"refresh_interval"`
}

// EnrichmentRule defines how to enrich telemetry data
type EnrichmentRule struct {
// Name is a unique identifier for this rule
Name string `mapstructure:"name"`

// DataSource specifies which data source to use
DataSource string `mapstructure:"data_source"`

// LookupAttributeKey specifies which attribute/field to use for lookup
LookupAttributeKey string `mapstructure:"lookup_attributekey"`

// LookupField specifies which field in the data source to match against
LookupField string `mapstructure:"lookup_field"`

// Mappings define how to map data source fields to telemetry attributes
Mappings []FieldMapping `mapstructure:"mappings"`

// Context specifies which telemetry context to enrich
// Valid values: "resource", "span", "metric", "log"
// Default: applies to all contexts if not specified
Context string `mapstructure:"context"`
}

// FieldMapping defines how to map a field from data source to telemetry attribute
type FieldMapping struct {
// SourceField is the field name in the data source
SourceField string `mapstructure:"source_field"`

// TargetAttribute is the attribute name in telemetry data
TargetAttribute string `mapstructure:"target_attribute"`
}

func (config *Config) Validate() error {
// Allow empty configuration for default config
if len(config.DataSources) == 0 && len(config.EnrichmentRules) == 0 {
return nil
}

if len(config.DataSources) == 0 {
return errors.New("at least one data source must be configured")
}

if len(config.EnrichmentRules) == 0 {
return errors.New("at least one enrichment rule must be configured")
}

// Validate data sources
dataSourceNames := make(map[string]bool)
for _, ds := range config.DataSources {
if ds.Name == "" {
return errors.New("data source name cannot be empty")
}

if dataSourceNames[ds.Name] {
return fmt.Errorf("duplicate data source name: %s", ds.Name)
}
dataSourceNames[ds.Name] = true

if ds.Type == "" {
return fmt.Errorf("data source type cannot be empty for data source: %s", ds.Name)
}

if ds.Type != "http" && ds.Type != "file" {
return fmt.Errorf("unsupported data source type: %s", ds.Type)
}

// Validate type-specific configuration
switch ds.Type {
case "http":
if ds.HTTP == nil {
return fmt.Errorf("HTTP configuration is required for http data source: %s", ds.Name)
}
if ds.HTTP.URL == "" {
return fmt.Errorf("URL is required for HTTP data source: %s", ds.Name)
}
case "file":
if ds.File == nil {
return fmt.Errorf("File configuration is required for file data source: %s", ds.Name)
}
if ds.File.Path == "" {
return fmt.Errorf("Path is required for file data source: %s", ds.Name)
}
}
}

// Validate enrichment rules
ruleNames := make(map[string]bool)
for _, rule := range config.EnrichmentRules {
if rule.Name == "" {
return errors.New("enrichment rule name cannot be empty")
}

if ruleNames[rule.Name] {
return fmt.Errorf("duplicate enrichment rule name: %s", rule.Name)
}
ruleNames[rule.Name] = true

if rule.DataSource == "" {
return fmt.Errorf("data source must be specified for rule: %s", rule.Name)
}

if !dataSourceNames[rule.DataSource] {
return fmt.Errorf("data source %s not found for rule: %s", rule.DataSource, rule.Name)
}

if rule.LookupAttributeKey == "" {
return fmt.Errorf("lookup key must be specified for rule: %s", rule.Name)
}

if rule.LookupField == "" {
return fmt.Errorf("lookup field must be specified for rule: %s", rule.Name)
}

if len(rule.Mappings) == 0 {
return fmt.Errorf("at least one mapping must be specified for rule: %s", rule.Name)
}
// Validate context if specified
if rule.Context != "" {
if rule.Context != ENRICHCONTEXTRESOURCE && rule.Context != ENRICHCONTEXTINDIVIDUAL {
return fmt.Errorf("invalid context value %s for rule %s. Valid values: resource, individual", rule.Context, rule.Name)
}
}
// Validate mappings
for _, mapping := range rule.Mappings {
if mapping.SourceField == "" {
return fmt.Errorf("source field cannot be empty in rule: %s", rule.Name)
}

if mapping.TargetAttribute == "" {
return fmt.Errorf("target attribute cannot be empty in rule: %s", rule.Name)
}
}
}

return nil
}
Loading
Loading