Skip to content
Merged
2 changes: 1 addition & 1 deletion receiver/elasticapmintakereceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.26.0

require (
github.com/cespare/xxhash v1.1.0
github.com/cespare/xxhash/v2 v2.3.0
github.com/elastic/apm-data v1.22.0
github.com/elastic/go-elasticsearch/v8 v8.19.6
github.com/elastic/opentelemetry-collector-components/internal/elasticattr v0.40.0
Expand Down Expand Up @@ -39,7 +40,6 @@ require (
github.com/Azure/go-ansiterm v0.0.0-20250102033503-faa5f7b0171c // indirect
github.com/Microsoft/go-winio v0.6.2 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/containerd/errdefs v1.0.0 // indirect
github.com/containerd/errdefs/pkg v0.3.0 // indirect
github.com/containerd/log v0.1.0 // indirect
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,14 +99,6 @@ func SetDerivedFieldsForSpan(event *modelpb.APMEvent, attributes pcommon.Map) {
}
}

// SetDerivedResourceAttributes sets resource fields that are NOT part of OTel. These fields are derived by the Enrichment lib in case of OTLP input
func SetDerivedResourceAttributes(event *modelpb.APMEvent, attributes pcommon.Map) {
if event.Agent != nil {
attributes.PutStr(elasticattr.AgentName, event.Agent.Name)
attributes.PutStr(elasticattr.AgentVersion, event.Agent.Version)
}
}

// SetDerivedFieldsForMetrics sets fields that are NOT part of OTel for metrics. These fields are derived by the Enrichment lib in case of OTLP input
func SetDerivedFieldsForMetrics(attributes pcommon.Map) {
attributes.PutStr(elasticattr.ProcessorEvent, "metric")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ package mappers // import "github.com/elastic/opentelemetry-collector-components

import (
"fmt"
"net/netip"
"time"

"go.opentelemetry.io/collector/pdata/pcommon"
Expand Down Expand Up @@ -351,110 +350,6 @@ func setTransactionMarks(marks map[string]*modelpb.TransactionMark, attributesMa
}
}

// SetElasticSpecificResourceAttributes maps APM event fields to OTel attributes at the resource level.
// The majority of the APM event fields are from the APM metadata model, so this mapping is applicable
// to all event types (OTel signals).
// Some APM events may contain fields that are APM metadata e.g error.context.service.framework will override
// the framework provided in the metadata. The apm-data library handles the override, so this function simply
// sets the resource attribute.
// These fields are not defined by OTel.
// Unlike fields from IntakeV2ToDerivedFields.go, these fields are not used by the UI.
func SetElasticSpecificResourceAttributes(event *modelpb.APMEvent, attributesMap pcommon.Map) {
if event.Cloud != nil {
if event.Cloud.Origin != nil {
putNonEmptyStr(attributesMap, elasticattr.CloudOriginAccountID, event.Cloud.Origin.AccountId)
putNonEmptyStr(attributesMap, elasticattr.CloudOriginProvider, event.Cloud.Origin.Provider)
putNonEmptyStr(attributesMap, elasticattr.CloudOriginRegion, event.Cloud.Origin.Region)
putNonEmptyStr(attributesMap, elasticattr.CloudOriginServiceName, event.Cloud.Origin.ServiceName)
}
putNonEmptyStr(attributesMap, elasticattr.CloudAccountName, event.Cloud.AccountName)
putNonEmptyStr(attributesMap, elasticattr.CloudInstanceID, event.Cloud.InstanceId)
putNonEmptyStr(attributesMap, elasticattr.CloudInstanceName, event.Cloud.InstanceName)
putNonEmptyStr(attributesMap, elasticattr.CloudMachineType, event.Cloud.MachineType)
putNonEmptyStr(attributesMap, elasticattr.CloudProjectID, event.Cloud.ProjectId)
putNonEmptyStr(attributesMap, elasticattr.CloudProjectName, event.Cloud.ProjectName)
}

if event.Faas != nil {
putNonEmptyStr(attributesMap, elasticattr.FaaSTriggerRequestID, event.Faas.TriggerRequestId)
putNonEmptyStr(attributesMap, elasticattr.FaaSExecution, event.Faas.Execution)
}

if event.Agent != nil {
putNonEmptyStr(attributesMap, elasticattr.AgentEphemeralID, event.Agent.EphemeralId)
putNonEmptyStr(attributesMap, elasticattr.AgentActivationMethod, event.Agent.ActivationMethod)
}

if event.Service != nil {
if event.Service.Framework != nil {
putNonEmptyStr(attributesMap, elasticattr.ServiceFrameworkName, event.Service.Framework.Name)
putNonEmptyStr(attributesMap, elasticattr.ServiceFrameworkVersion, event.Service.Framework.Version)
}
if event.Service.Origin != nil {
putNonEmptyStr(attributesMap, elasticattr.ServiceOriginID, event.Service.Origin.Id)
putNonEmptyStr(attributesMap, elasticattr.ServiceOriginName, event.Service.Origin.Name)
putNonEmptyStr(attributesMap, elasticattr.ServiceOriginVersion, event.Service.Origin.Version)
}
}

if event.Host != nil {
putNonEmptyStr(attributesMap, elasticattr.HostHostName, event.Host.Hostname)
}

if event.Source != nil {
if event.Source.Nat != nil && event.Source.Nat.Ip != nil {
ip := modelpb.IP2Addr(event.Source.Nat.Ip)
putNonEmptyStr(attributesMap, elasticattr.SourceNATIP, ip.String())
}
}

if event.User != nil {
putNonEmptyStr(attributesMap, elasticattr.UserDomain, event.User.Domain)
}

if event.Destination != nil {
if event.Destination.Address != "" {
if ip, err := netip.ParseAddr(event.Destination.Address); err == nil {
attributesMap.PutStr(elasticattr.DestinationIP, ip.String())
}
}
}

setLabels(event, attributesMap)
}

// setLabels sets single value label fields from the APMEvent Labels and NumericLabels fields.
// Labels are added as attributes with appropriate key prefixes: "labels." and "numeric_labels.".
// Allows key names with spaces to match existing behavior.
// Ignored empty keys and values.
//
// The apm data library logic will take care of overwriting metadata labels with event labels when decoding
// the input to modelpb.APMEvent, so we simply copy all labels from the event here.
func setLabels(event *modelpb.APMEvent, attributesMap pcommon.Map) {
for key, labelValue := range event.Labels {
if key == "" || labelValue == nil {
continue
}
if labelValue.Value != "" {
attributesMap.PutStr("labels."+key, labelValue.Value)
}
if len(labelValue.Values) > 0 {
labelValues := attributesMap.PutEmptySlice("labels." + key)
labelValues.EnsureCapacity(len(labelValue.Values))
for _, v := range labelValue.Values {
labelValues.AppendEmpty().SetStr(v)
}
}
}

for key, numericLabelValue := range event.NumericLabels {
if key == "" || numericLabelValue == nil {
continue
}
attributesMap.PutDouble("numeric_labels."+key, numericLabelValue.Value)
}
}

// SetElasticSpecificFieldsForLog sets fields that are not defined by OTel.
// Unlike fields from IntakeV2ToDerivedFields.go, these fields are not used by the UI.
func SetElasticSpecificFieldsForLog(event *modelpb.APMEvent, attributesMap pcommon.Map) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,59 +21,11 @@ package mappers // import "github.com/elastic/opentelemetry-collector-components

import (
"go.opentelemetry.io/collector/pdata/pcommon"
semconv22 "go.opentelemetry.io/otel/semconv/v1.22.0"
semconv "go.opentelemetry.io/otel/semconv/v1.27.0"

"github.com/elastic/apm-data/model/modelpb"
)

// TranslateToOtelResourceAttributes translates resource attributes from the Elastic APM model to SemConv resource attributes
func TranslateToOtelResourceAttributes(event *modelpb.APMEvent, attributes pcommon.Map) {
if event.Service != nil {
putNonEmptyStr(attributes, string(semconv.ServiceNameKey), event.Service.Name)
putNonEmptyStr(attributes, string(semconv.ServiceVersionKey), event.Service.Version)
if event.Service.Language != nil {
putNonEmptyStr(attributes, string(semconv.TelemetrySDKLanguageKey), event.Service.Language.Name)
putNonEmptyStr(attributes, string(semconv.TelemetrySDKVersionKey), event.Service.Language.Version)
}
if event.Service.Runtime != nil {
putNonEmptyStr(attributes, string(semconv.ProcessRuntimeNameKey), event.Service.Runtime.Name)
putNonEmptyStr(attributes, string(semconv.ProcessRuntimeVersionKey), event.Service.Runtime.Version)
}
attributes.PutStr(string(semconv.TelemetrySDKNameKey), "ElasticAPM")
if event.Service.Environment != "" {
// elasticsearchexporter currently uses v1.22.0 of the OTel SemConv, so we need to include the v1.22.0 attribute
attributes.PutStr(string(semconv22.DeploymentEnvironmentKey), event.Service.Environment)
attributes.PutStr(string(semconv.DeploymentEnvironmentNameKey), event.Service.Environment)
}
if event.Service.Node != nil {
putNonEmptyStr(attributes, string(semconv.ServiceInstanceIDKey), event.Service.Node.Name)
}
}
if event.Host != nil {
putNonEmptyStr(attributes, string(semconv.HostNameKey), event.Host.Name)
putNonEmptyStr(attributes, string(semconv.HostIDKey), event.Host.Id)
putNonEmptyStr(attributes, string(semconv.HostArchKey), event.Host.Architecture)
if event.Host.Os != nil {
putNonEmptyStr(attributes, string(semconv.OSNameKey), event.Host.Os.Name)
putNonEmptyStr(attributes, string(semconv.OSTypeKey), event.Host.Os.Platform)
putNonEmptyStr(attributes, string(semconv.OSVersionKey), event.Host.Os.Version)
}
}

// UserAgent fields are only expected to be available for error and transaction events.
// Translating here since fields should be present at the resource level.
// https://opentelemetry.io/docs/specs/semconv/registry/attributes/user-agent
if event.UserAgent != nil {
putNonEmptyStr(attributes, string(semconv.UserAgentOriginalKey), event.UserAgent.Original)
}

translateCloudAttributes(event, attributes)
translateContainerAndKubernetesAttributes(event, attributes)
translateProcessUserNetworkAttributes(event, attributes)
translateFaasAttributes(event, attributes)
}

// TranslateIntakeV2TransactionToOTelAttributes translates transaction attributes from the Elastic APM model to SemConv attributes
func TranslateIntakeV2TransactionToOTelAttributes(event *modelpb.APMEvent, attributes pcommon.Map) {
translateHttpAttributes(event, attributes)
Expand Down Expand Up @@ -128,108 +80,6 @@ func TranslateIntakeV2LogToOTelAttributes(event *modelpb.APMEvent, attributes pc
translateUrlAttributes(event, attributes)
}

func translateCloudAttributes(event *modelpb.APMEvent, attributes pcommon.Map) {
if event.Cloud != nil {
putNonEmptyStr(attributes, string(semconv.CloudProviderKey), event.Cloud.Provider)
putNonEmptyStr(attributes, string(semconv.CloudRegionKey), event.Cloud.Region)
putNonEmptyStr(attributes, string(semconv.CloudAvailabilityZoneKey), event.Cloud.AvailabilityZone)
putNonEmptyStr(attributes, string(semconv.CloudAccountIDKey), event.Cloud.AccountId)
putNonEmptyStr(attributes, string(semconv.CloudPlatformKey), event.Cloud.ServiceName)
}
}

func translateContainerAndKubernetesAttributes(event *modelpb.APMEvent, attributes pcommon.Map) {
// Container fields
if event.Container != nil {
putNonEmptyStr(attributes, string(semconv.ContainerIDKey), event.Container.Id)
putNonEmptyStr(attributes, string(semconv.ContainerNameKey), event.Container.Name)
putNonEmptyStr(attributes, string(semconv.ContainerRuntimeKey), event.Container.Runtime)
putNonEmptyStr(attributes, string(semconv.ContainerImageNameKey), event.Container.ImageName)
putNonEmptyStr(attributes, string(semconv.ContainerImageTagsKey), event.Container.ImageTag)
}

// Kubernetes fields
if event.Kubernetes != nil {
putNonEmptyStr(attributes, string(semconv.K8SNamespaceNameKey), event.Kubernetes.Namespace)
putNonEmptyStr(attributes, string(semconv.K8SNodeNameKey), event.Kubernetes.NodeName)
putNonEmptyStr(attributes, string(semconv.K8SPodNameKey), event.Kubernetes.PodName)
putNonEmptyStr(attributes, string(semconv.K8SPodUIDKey), event.Kubernetes.PodUid)
}
}

func translateProcessUserNetworkAttributes(event *modelpb.APMEvent, attributes pcommon.Map) {
// Process fields
if event.Process != nil {
if event.Process.Pid != 0 {
attributes.PutInt(string(semconv.ProcessPIDKey), int64(event.Process.Pid))
}
if event.Process.Ppid != 0 {
attributes.PutInt(string(semconv.ProcessParentPIDKey), int64(event.Process.Ppid))
}
putNonEmptyStr(attributes, string(semconv.ProcessExecutableNameKey), event.Process.Title)
if len(event.Process.Argv) > 0 {
commandLineArgs := attributes.PutEmptySlice(string(semconv.ProcessCommandLineKey))
commandLineArgs.EnsureCapacity(len(event.Process.Argv))
for _, arg := range event.Process.Argv {
commandLineArgs.AppendEmpty().SetStr(arg)
}
}
putNonEmptyStr(attributes, string(semconv.ProcessExecutablePathKey), event.Process.Executable)
}

// translate user fields defined here: https://opentelemetry.io/docs/specs/semconv/registry/attributes/user
if event.User != nil {
putNonEmptyStr(attributes, string(semconv.UserIDKey), event.User.Id)
putNonEmptyStr(attributes, string(semconv.UserEmailKey), event.User.Email)
putNonEmptyStr(attributes, string(semconv.UserNameKey), event.User.Name)
}

// translate network fields defined here: https://opentelemetry.io/docs/specs/semconv/registry/attributes/network
if event.Network != nil {
if event.Network.Connection != nil {
putNonEmptyStr(attributes, string(semconv.NetworkConnectionTypeKey), event.Network.Connection.Type)
putNonEmptyStr(attributes, string(semconv.NetworkConnectionSubtypeKey), event.Network.Connection.Subtype)
}
if event.Network.Carrier != nil {
putNonEmptyStr(attributes, string(semconv.NetworkCarrierNameKey), event.Network.Carrier.Name)
putNonEmptyStr(attributes, string(semconv.NetworkCarrierMccKey), event.Network.Carrier.Mcc)
putNonEmptyStr(attributes, string(semconv.NetworkCarrierMncKey), event.Network.Carrier.Mnc)
putNonEmptyStr(attributes, string(semconv.NetworkCarrierIccKey), event.Network.Carrier.Icc)
}
}

if event.Client != nil {
translateIPAddress(string(semconv.ClientAddressKey), event.Client.Ip, attributes)
if event.Client.Port != 0 {
attributes.PutInt(string(semconv.ClientPortKey), int64(event.Client.Port))
}
}

if event.Source != nil {
translateIPAddress(string(semconv.SourceAddressKey), event.Source.Ip, attributes)
if event.Source.Port != 0 {
attributes.PutInt(string(semconv.SourcePortKey), int64(event.Source.Port))
}
}
}

func translateIPAddress(key string, ip *modelpb.IP, attributes pcommon.Map) {
if ip != nil {
ipAddr := modelpb.IP2Addr(ip)
putNonEmptyStr(attributes, key, ipAddr.String())
}
}

func translateFaasAttributes(event *modelpb.APMEvent, attributes pcommon.Map) {
if event.Faas != nil {
putNonEmptyStr(attributes, string(semconv.FaaSInstanceKey), event.Faas.Id)
putNonEmptyStr(attributes, string(semconv.FaaSNameKey), event.Faas.Name)
putNonEmptyStr(attributes, string(semconv.FaaSVersionKey), event.Faas.Version)
putNonEmptyStr(attributes, string(semconv.FaaSTriggerKey), event.Faas.TriggerType)
putPtrBool(attributes, string(semconv.FaaSColdstartKey), event.Faas.ColdStart)
}
}

func translateHttpAttributes(event *modelpb.APMEvent, attributes pcommon.Map) {
if event.Http != nil {
if event.Http.Request != nil {
Expand Down
Loading