diff --git a/processor/servicegraphprocessor/internal/store/edge.go b/processor/servicegraphprocessor/internal/store/edge.go index 584e566d33433..3945c64ee28f2 100644 --- a/processor/servicegraphprocessor/internal/store/edge.go +++ b/processor/servicegraphprocessor/internal/store/edge.go @@ -28,6 +28,10 @@ const ( Database ConnectionType = "database" ) +type Peer struct { + RpcAttributes map[string]string +} + // Edge is an Edge between two nodes in the graph type Edge struct { key Key @@ -46,6 +50,8 @@ type Edge struct { // expiration is the time at which the Edge expires, expressed as Unix time expiration time.Time + + Peer Peer } func newEdge(key Key, ttl time.Duration) *Edge { @@ -53,6 +59,7 @@ func newEdge(key Key, ttl time.Duration) *Edge { key: key, Dimensions: make(map[string]string), expiration: time.Now().Add(ttl), + Peer: Peer{RpcAttributes: make(map[string]string)}, } } diff --git a/processor/servicegraphprocessor/internal/store/store.go b/processor/servicegraphprocessor/internal/store/store.go index 84303a81a983b..1e34b7f599154 100644 --- a/processor/servicegraphprocessor/internal/store/store.go +++ b/processor/servicegraphprocessor/internal/store/store.go @@ -17,6 +17,7 @@ package store // import "github.com/open-telemetry/opentelemetry-collector-contr import ( "container/list" "errors" + semconv "go.opentelemetry.io/collector/semconv/v1.13.0" "sync" "time" @@ -25,6 +26,8 @@ import ( var ( ErrTooManyItems = errors.New("too many items") + // NeedToFindAttributes the list of attributes need to matches, the higher the front, the higher the priority. + NeedToFindAttributes = []string{semconv.AttributeNetSockHostAddr, semconv.AttributeRPCService, semconv.AttributeHTTPURL, semconv.AttributeHTTPTarget, semconv.AttributeNetPeerName, semconv.AttributeNetHostName} ) type Callback func(e *Edge) @@ -119,10 +122,47 @@ func (s *Store) Expire() { defer s.mtx.Unlock() // Iterates until no more items can be evicted - for s.tryEvictHead() { + for s.trySpeculateEvictHead() { + s.tryEvictHead() } } +// speculate virtual node before edge get expired. +func (s *Store) trySpeculateEvictHead() bool { + head := s.l.Front() + if head == nil { + return false // list is empty + } + headEdge := head.Value.(*Edge) + if !headEdge.isExpired() { + return false + } + + if len(headEdge.ClientService) == 0 { + headEdge.ClientService = "user" + } + + if len(headEdge.ServerService) == 0 { + headEdge.ServerService = s.getPeerHost(NeedToFindAttributes, headEdge.Peer.RpcAttributes) + } + + if headEdge.isComplete() { + s.onComplete(headEdge) + } + return true +} + +func (s *Store) getPeerHost(m []string, peers map[string]string) string { + peerStr := "unknown" + for _, s := range m { + if len(peers[s]) != 0 { + peerStr = peers[s] + break + } + } + return peerStr +} + // tryEvictHead checks if the oldest item (head of list) can be evicted and will delete it if so. // Returns true if the head was evicted. // diff --git a/processor/servicegraphprocessor/internal/store/store_test.go b/processor/servicegraphprocessor/internal/store/store_test.go index 6e6306f7ca399..002bf41a25cf7 100644 --- a/processor/servicegraphprocessor/internal/store/store_test.go +++ b/processor/servicegraphprocessor/internal/store/store_test.go @@ -136,7 +136,8 @@ func TestStoreExpire(t *testing.T) { s.Expire() assert.Equal(t, 0, s.len()) - assert.Equal(t, 0, onCompletedCount) + // callball onComplete before edge expired. + assert.Equal(t, 100, onCompletedCount) assert.Equal(t, testSize, onExpireCount) } diff --git a/processor/servicegraphprocessor/processor.go b/processor/servicegraphprocessor/processor.go index c21a8b109aff5..63dd311b41278 100644 --- a/processor/servicegraphprocessor/processor.go +++ b/processor/servicegraphprocessor/processor.go @@ -206,6 +206,7 @@ func (p *processor) aggregateMetrics(ctx context.Context, td ptrace.Traces) (err e.ClientLatencySec = float64(span.EndTimestamp()-span.StartTimestamp()) / float64(time.Millisecond.Nanoseconds()) e.Failed = e.Failed || span.Status().Code() == ptrace.StatusCodeError p.upsertDimensions(clientKind, e.Dimensions, rAttributes, span.Attributes()) + p.upsertPeerAttributes(store.NeedToFindAttributes, e.Peer.RpcAttributes, span.Attributes()) // A database request will only have one span, we don't wait for the server // span but just copy details from the client span @@ -263,6 +264,14 @@ func (p *processor) upsertDimensions(kind string, m map[string]string, resourceA } } +func (p *processor) upsertPeerAttributes(m []string, peers map[string]string, spanAttr pcommon.Map) { + for _, s := range m { + if v, ok := findAttributeValue(s, spanAttr); ok { + peers[s] = v + } + } +} + func (p *processor) onComplete(e *store.Edge) { p.logger.Debug( "edge completed", diff --git a/processor/servicegraphprocessor/processor_test.go b/processor/servicegraphprocessor/processor_test.go index 00da463863531..5803d07840028 100644 --- a/processor/servicegraphprocessor/processor_test.go +++ b/processor/servicegraphprocessor/processor_test.go @@ -228,6 +228,9 @@ func sampleTraces() ptrace.Traces { clientSpan.SetStartTimestamp(pcommon.NewTimestampFromTime(tStart)) clientSpan.SetEndTimestamp(pcommon.NewTimestampFromTime(tEnd)) clientSpan.Attributes().PutStr("some-attribute", "val") // Attribute selected as dimension for metrics + clientSpan.Attributes().PutStr(semconv.AttributeNetPeerIP, "127.0.0.99") + clientSpan.Attributes().PutStr(semconv.AttributeHTTPURL, "https://www.foo.bar/search?q=OpenTelemetry#SemConv") + clientSpan.Attributes().PutStr(semconv.AttributeRPCService, "myservice.EchoService") serverSpan := scopeSpans.Spans().AppendEmpty() serverSpan.SetName("server span") diff --git a/processor/servicegraphprocessor/testdata/service-graph-config-debug.yaml b/processor/servicegraphprocessor/testdata/service-graph-config-debug.yaml new file mode 100644 index 0000000000000..7f1cb2e5bd07b --- /dev/null +++ b/processor/servicegraphprocessor/testdata/service-graph-config-debug.yaml @@ -0,0 +1,38 @@ +receivers: + otlp: + protocols: + grpc: + otlp/processor_metrics: # Dummy receiver for the metrics pipeline + protocols: + grpc: + endpoint: localhost:12345 + +processors: + servicegraph: + metrics_exporter: prometheus/processor_metrics # Exporter to send metrics to + latency_histogram_buckets: [100us, 1ms, 2ms, 6ms, 10ms, 100ms, 250ms] # Buckets for latency histogram + dimensions: [cluster, namespace] # Additional dimensions (labels) to be added to the metrics extracted from the resource and span attributes + store: # Configuration for the in-memory store + ttl: 60s # Value to wait for an edge to be completed + max_items: 200 # Amount of edges that will be stored in the storeMap + +exporters: + prometheus/processor_metrics: + endpoint: "0.0.0.0:8889" + otlp: + endpoint: localhost:4317 + logging: + +service: + telemetry: + logs: + level: debug + pipelines: + traces: + receivers: [otlp] + processors: [servicegraph] + exporters: [logging] + metrics/processor_metrics: + receivers: [otlp/processor_metrics] + processors: [] + exporters: [prometheus/processor_metrics] \ No newline at end of file