Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[processor/geoipprocessor] Add attributes parameter and consider both source.address and client.address by default #37008

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/geoipprocessor-client-address.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: geoipprocessor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add the `attributes` parameter and consider both `source.address` and `client.address` by default

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [37008]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
10 changes: 6 additions & 4 deletions processor/geoipprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

## Description

The geoIP processor `geoipprocessor` enhances the attributes of a span, log, or metric by appending information about the geographical location of an IP address. To add geographical information, the IP address must be included in the attributes using the [`source.address` semantic conventions key attribute](https://github.com/open-telemetry/semantic-conventions/blob/v1.26.0/docs/general/attributes.md#source). By default, only the resource attributes will be modified. Please refer to [config.go](./config.go) for the config spec.
The geoIP processor `geoipprocessor` enhances the attributes of a span, log, or metric by appending information about the geographical location of an IP address. To add geographical information, the IP address must be included in the attributes specified by `attributes` ([`client.address`](https://github.com/open-telemetry/semantic-conventions/blob/v1.29.0/docs/general/attributes.md#client-attributes) and [`source.address`](https://github.com/open-telemetry/semantic-conventions/blob/v1.29.0/docs/general/attributes.md#source) by default). By default, only the resource attributes will be modified. Please refer to [config.go](./config.go) for the config spec.
bencehornak marked this conversation as resolved.
Show resolved Hide resolved

### Geographical location metadata

Expand All @@ -36,13 +36,14 @@ The following [resource attributes](./internal/convention/attributes.go) will be

## Configuration

The following settings must be configured:
The following settings can be configured:

- `providers`: A map containing geographical location information providers. These providers are used to search for the geographical location attributes associated with an IP. Supported providers:
- [maxmind](./internal/provider/maxmindprovider/README.md)
- `context`: Allows specifying the underlying telemetry context the processor will work with. Available values:
- `resource`(default): Resource attributes.
- `context` (default: `resource`): Allows specifying the underlying telemetry context the processor will work with. Available values:
- `resource`: Resource attributes.
- `record`: Attributes within a data point, log record or a span.
- `attributes` (default: `[client.address, source.address]`): An array of attribute names, which are used for the IP address lookup
bencehornak marked this conversation as resolved.
Show resolved Hide resolved

## Examples

Expand All @@ -54,4 +55,5 @@ processors:
providers:
maxmind:
database_path: /tmp/mygeodb
attributes: [client.address, source.address, custom.address]
```
8 changes: 8 additions & 0 deletions processor/geoipprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/confmap"
"go.opentelemetry.io/otel/attribute"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/geoipprocessor/internal/provider"
)
Expand Down Expand Up @@ -43,6 +44,9 @@ type Config struct {

// Context section allows specifying the source type to look for the IP. Available options: resource or record.
Context ContextID `mapstructure:"context"`

// An array of attribute names, which are used for the IP address lookup
Attributes []attribute.Key `mapstructure:"attributes"`
}

var (
Expand All @@ -62,6 +66,10 @@ func (cfg *Config) Validate() error {
}
}

if cfg.Attributes != nil && len(cfg.Attributes) == 0 {
return errors.New("the attributes array must not be empty")
}

return nil
}

Expand Down
17 changes: 17 additions & 0 deletions processor/geoipprocessor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/confmap/confmaptest"
"go.opentelemetry.io/collector/otelcol/otelcoltest"
"go.opentelemetry.io/otel/attribute"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/geoipprocessor/internal/metadata"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/geoipprocessor/internal/provider"
Expand All @@ -39,6 +40,7 @@ func TestLoadConfig(t *testing.T) {
Providers: map[string]provider.Config{
"maxmind": &maxmind.Config{DatabasePath: "/tmp/db"},
},
Attributes: defaultAttributes,
},
},
{
Expand All @@ -48,6 +50,7 @@ func TestLoadConfig(t *testing.T) {
Providers: map[string]provider.Config{
"maxmind": &maxmind.Config{DatabasePath: "/tmp/db"},
},
Attributes: defaultAttributes,
},
},
{
Expand All @@ -58,6 +61,20 @@ func TestLoadConfig(t *testing.T) {
id: component.NewIDWithName(metadata.Type, "invalid_source"),
unmarshalErrorMessage: "unknown context not.an.otlp.context, available values: resource, record",
},
{
id: component.NewIDWithName(metadata.Type, "invalid_source_attributes"),
validateErrorMessage: "the attributes array must not be empty",
},
{
id: component.NewIDWithName(metadata.Type, "custom_source_attributes"),
expected: &Config{
Context: resource,
Providers: map[string]provider.Config{
"maxmind": &maxmind.Config{DatabasePath: "/tmp/db"},
},
Attributes: []attribute.Key{"client.address", "source.address", "custom.address"},
},
},
}

for _, tt := range tests {
Expand Down
18 changes: 11 additions & 7 deletions processor/geoipprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@ import (

var (
processorCapabilities = consumer.Capabilities{MutatesData: true}
// defaultResourceAttributes holds a list of default resource attribute keys.
// defaultAttributes holds a list of default resource attribute keys.
// These keys are used to identify an IP address attribute associated with the resource.
defaultResourceAttributes = []attribute.Key{
semconv.SourceAddressKey, // This key represents the standard source address attribute as defined in the OpenTelemetry semantic conventions.
defaultAttributes = []attribute.Key{
// The client attributes are in use by the HTTP semantic conventions
semconv.ClientAddressKey,
// The source attributes are used when there is no client/server relationship between the two sides, or when that relationship is unknown
semconv.SourceAddressKey,
}
)

Expand Down Expand Up @@ -52,7 +55,8 @@ func getProviderFactory(key string) (provider.GeoIPProviderFactory, bool) {
// createDefaultConfig returns a default configuration for the processor.
func createDefaultConfig() component.Config {
return &Config{
Context: resource,
Context: resource,
Attributes: defaultAttributes,
}
}

Expand Down Expand Up @@ -88,7 +92,7 @@ func createMetricsProcessor(ctx context.Context, set processor.Settings, cfg com
if err != nil {
return nil, err
}
return processorhelper.NewMetrics(ctx, set, cfg, nextConsumer, newGeoIPProcessor(geoCfg, defaultResourceAttributes, providers, set).processMetrics, processorhelper.WithCapabilities(processorCapabilities))
return processorhelper.NewMetrics(ctx, set, cfg, nextConsumer, newGeoIPProcessor(geoCfg, providers, set).processMetrics, processorhelper.WithCapabilities(processorCapabilities))
}

func createTracesProcessor(ctx context.Context, set processor.Settings, cfg component.Config, nextConsumer consumer.Traces) (processor.Traces, error) {
Expand All @@ -97,7 +101,7 @@ func createTracesProcessor(ctx context.Context, set processor.Settings, cfg comp
if err != nil {
return nil, err
}
return processorhelper.NewTraces(ctx, set, cfg, nextConsumer, newGeoIPProcessor(geoCfg, defaultResourceAttributes, providers, set).processTraces, processorhelper.WithCapabilities(processorCapabilities))
return processorhelper.NewTraces(ctx, set, cfg, nextConsumer, newGeoIPProcessor(geoCfg, providers, set).processTraces, processorhelper.WithCapabilities(processorCapabilities))
}

func createLogsProcessor(ctx context.Context, set processor.Settings, cfg component.Config, nextConsumer consumer.Logs) (processor.Logs, error) {
Expand All @@ -106,5 +110,5 @@ func createLogsProcessor(ctx context.Context, set processor.Settings, cfg compon
if err != nil {
return nil, err
}
return processorhelper.NewLogs(ctx, set, cfg, nextConsumer, newGeoIPProcessor(geoCfg, defaultResourceAttributes, providers, set).processLogs, processorhelper.WithCapabilities(processorCapabilities))
return processorhelper.NewLogs(ctx, set, cfg, nextConsumer, newGeoIPProcessor(geoCfg, providers, set).processLogs, processorhelper.WithCapabilities(processorCapabilities))
}
16 changes: 7 additions & 9 deletions processor/geoipprocessor/geoip_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,17 @@ var (

// newGeoIPProcessor creates a new instance of geoIPProcessor with the specified fields.
type geoIPProcessor struct {
providers []provider.GeoIPProvider
resourceAttributes []attribute.Key
logger *zap.Logger
providers []provider.GeoIPProvider
logger *zap.Logger

cfg *Config
}

func newGeoIPProcessor(processorConfig *Config, resourceAttributes []attribute.Key, providers []provider.GeoIPProvider, params processor.Settings) *geoIPProcessor {
func newGeoIPProcessor(processorConfig *Config, providers []provider.GeoIPProvider, params processor.Settings) *geoIPProcessor {
return &geoIPProcessor{
resourceAttributes: resourceAttributes,
providers: providers,
cfg: processorConfig,
logger: params.Logger,
providers: providers,
cfg: processorConfig,
logger: params.Logger,
}
}

Expand Down Expand Up @@ -92,7 +90,7 @@ func (g *geoIPProcessor) geoLocation(ctx context.Context, ip net.IP) (attribute.

// processAttributes processes a pcommon.Map by adding geolocation attributes based on the found IP address.
func (g *geoIPProcessor) processAttributes(ctx context.Context, metadata pcommon.Map) error {
ipAddr, err := ipFromAttributes(g.resourceAttributes, metadata)
ipAddr, err := ipFromAttributes(g.cfg.Attributes, metadata)
if err != nil {
// TODO: log IP error not found
if errors.Is(err, errIPNotFound) {
Expand Down
65 changes: 30 additions & 35 deletions processor/geoipprocessor/geoip_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"go.opentelemetry.io/collector/processor"
"go.opentelemetry.io/collector/processor/processortest"
"go.opentelemetry.io/otel/attribute"
semconv "go.opentelemetry.io/otel/semconv/v1.21.0"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/plogtest"
Expand Down Expand Up @@ -82,52 +81,49 @@ var baseProviderMock = providerMock{
}

var testCases = []struct {
name string
goldenDir string
context ContextID
lookupAttributes []attribute.Key
name string
goldenDir string
context ContextID
}{
{
name: "default source.address attribute, not found",
goldenDir: "no_source_address",
context: resource,
lookupAttributes: defaultResourceAttributes,
name: "default source.address attribute, not found",
goldenDir: "resource_no_source_address",
context: resource,
},
{
name: "default source.address attribute",
goldenDir: "source_address",
context: resource,
lookupAttributes: defaultResourceAttributes,
name: "default source.address attribute",
goldenDir: "resource_source_address",
context: resource,
},
{
name: "default source.address attribute no geo metadata found by providers",
goldenDir: "source_address_geo_not_found",
context: resource,
lookupAttributes: defaultResourceAttributes,
name: "default source.address attribute no geo metadata found by providers",
goldenDir: "resource_source_address_geo_not_found",
context: resource,
},
{
name: "default source.ip attribute with an unspecified IP address should be skipped",
goldenDir: "unspecified_address",
context: resource,
lookupAttributes: defaultResourceAttributes,
name: "default source.ip attribute with an unspecified IP address should be skipped",
goldenDir: "resource_unspecified_address",
context: resource,
},
{
name: "custom source attributes",
goldenDir: "custom_sources",
context: resource,
lookupAttributes: []attribute.Key{"ip", "host.ip"},
name: "do not add resource attributes with an invalid ip",
goldenDir: "resource_invalid_address",
context: resource,
},
{
name: "do not add resource attributes with an invalid ip",
goldenDir: "invalid_address",
context: resource,
lookupAttributes: defaultResourceAttributes,
name: "source address located in the record attributes",
goldenDir: "record_source_address",
context: record,
},
{
name: "source address located in inner attributes",
goldenDir: "attribute_source_address",
context: record,
lookupAttributes: defaultResourceAttributes,
name: "client address located in the record attributes",
goldenDir: "record_client_address",
context: record,
},
{
name: "custom address located in the record attributes",
goldenDir: "record_custom_address",
context: record,
},
}

Expand Down Expand Up @@ -205,7 +201,6 @@ func TestProcessor(t *testing.T) {
baseProviderMock.LocationF = func(_ context.Context, sourceIP net.IP) (attribute.Set, error) {
if sourceIP.Equal(net.IPv4(1, 2, 3, 4)) {
return attribute.NewSet([]attribute.KeyValue{
semconv.SourceAddress("1.2.3.4"),
attribute.String(conventions.AttributeGeoCityName, "Boxford"),
attribute.String(conventions.AttributeGeoContinentCode, "EU"),
attribute.String(conventions.AttributeGeoContinentName, "Europe"),
Expand All @@ -226,7 +221,7 @@ func TestProcessor(t *testing.T) {

for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
cfg := &Config{Context: tt.context, Providers: map[string]provider.Config{providerKey: &providerConfigMock{}}}
cfg := &Config{Context: tt.context, Providers: map[string]provider.Config{providerKey: &providerConfigMock{}}, Attributes: []attribute.Key{"source.address", "client.address", "custom.address"}}
bencehornak marked this conversation as resolved.
Show resolved Hide resolved
compareAllSignals(cfg, tt.goldenDir)(t)
})
}
Expand Down
3 changes: 2 additions & 1 deletion processor/geoipprocessor/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/geoipprocessor/internal/provider"
maxmind "github.com/open-telemetry/opentelemetry-collector-contrib/processor/geoipprocessor/internal/provider/maxmindprovider"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/geoipprocessor/internal/provider/maxmindprovider/testdata"
"go.opentelemetry.io/otel/attribute"
)

func TestProcessorWithMaxMind(t *testing.T) {
Expand All @@ -22,7 +23,7 @@ func TestProcessorWithMaxMind(t *testing.T) {

for _, tt := range testCases {
t.Run("maxmind_"+tt.name, func(t *testing.T) {
cfg := &Config{Context: tt.context, Providers: map[string]provider.Config{"maxmind": &maxmindConfig}}
cfg := &Config{Context: tt.context, Providers: map[string]provider.Config{"maxmind": &maxmindConfig}, Attributes: []attribute.Key{"source.address", "client.address", "custom.address"}}

compareAllSignals(cfg, tt.goldenDir)(t)
})
Expand Down
10 changes: 10 additions & 0 deletions processor/geoipprocessor/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,13 @@ geoip/invalid_source:
maxmind:
database_path: /tmp/db
context: not.an.otlp.context
geoip/invalid_source_attributes:
providers:
maxmind:
database_path: /tmp/db
attributes: []
geoip/custom_source_attributes:
providers:
maxmind:
database_path: /tmp/db
attributes: [client.address, source.address, custom.address]

This file was deleted.

Loading