Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions processor/servicegraphprocessor/internal/store/edge.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ const (
Database ConnectionType = "database"
)

type Peer struct {
RpcAttributes map[string]string
}

// Edge is an Edge between two nodes in the graph
type Edge struct {
key Key
Expand All @@ -46,13 +50,16 @@ type Edge struct {

// expiration is the time at which the Edge expires, expressed as Unix time
expiration time.Time

Peer Peer
}

func newEdge(key Key, ttl time.Duration) *Edge {
return &Edge{
key: key,
Dimensions: make(map[string]string),
expiration: time.Now().Add(ttl),
Peer: Peer{RpcAttributes: make(map[string]string)},
}
}

Expand Down
42 changes: 41 additions & 1 deletion processor/servicegraphprocessor/internal/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package store // import "github.com/open-telemetry/opentelemetry-collector-contr
import (
"container/list"
"errors"
semconv "go.opentelemetry.io/collector/semconv/v1.13.0"
"sync"
"time"

Expand All @@ -25,6 +26,8 @@ import (

var (
ErrTooManyItems = errors.New("too many items")
// NeedToFindAttributes the list of attributes need to matches, the higher the front, the higher the priority.
NeedToFindAttributes = []string{semconv.AttributeNetSockHostAddr, semconv.AttributeRPCService, semconv.AttributeHTTPURL, semconv.AttributeHTTPTarget, semconv.AttributeNetPeerName, semconv.AttributeNetHostName}
)

type Callback func(e *Edge)
Expand Down Expand Up @@ -119,10 +122,47 @@ func (s *Store) Expire() {
defer s.mtx.Unlock()

// Iterates until no more items can be evicted
for s.tryEvictHead() {
for s.trySpeculateEvictHead() {
s.tryEvictHead()
}
}

// speculate virtual node before edge get expired.
func (s *Store) trySpeculateEvictHead() bool {
head := s.l.Front()
if head == nil {
return false // list is empty
}
headEdge := head.Value.(*Edge)
if !headEdge.isExpired() {
return false
}

if len(headEdge.ClientService) == 0 {
headEdge.ClientService = "user"
}

if len(headEdge.ServerService) == 0 {
headEdge.ServerService = s.getPeerHost(NeedToFindAttributes, headEdge.Peer.RpcAttributes)
}

if headEdge.isComplete() {
s.onComplete(headEdge)
}
return true
}

func (s *Store) getPeerHost(m []string, peers map[string]string) string {
peerStr := "unknown"
for _, s := range m {
if len(peers[s]) != 0 {
peerStr = peers[s]
break
}
}
return peerStr
}

// tryEvictHead checks if the oldest item (head of list) can be evicted and will delete it if so.
// Returns true if the head was evicted.
//
Expand Down
3 changes: 2 additions & 1 deletion processor/servicegraphprocessor/internal/store/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ func TestStoreExpire(t *testing.T) {

s.Expire()
assert.Equal(t, 0, s.len())
assert.Equal(t, 0, onCompletedCount)
// callball onComplete before edge expired.
assert.Equal(t, 100, onCompletedCount)
assert.Equal(t, testSize, onExpireCount)
}

Expand Down
9 changes: 9 additions & 0 deletions processor/servicegraphprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ func (p *processor) aggregateMetrics(ctx context.Context, td ptrace.Traces) (err
e.ClientLatencySec = float64(span.EndTimestamp()-span.StartTimestamp()) / float64(time.Millisecond.Nanoseconds())
e.Failed = e.Failed || span.Status().Code() == ptrace.StatusCodeError
p.upsertDimensions(clientKind, e.Dimensions, rAttributes, span.Attributes())
p.upsertPeerAttributes(store.NeedToFindAttributes, e.Peer.RpcAttributes, span.Attributes())

// A database request will only have one span, we don't wait for the server
// span but just copy details from the client span
Expand Down Expand Up @@ -263,6 +264,14 @@ func (p *processor) upsertDimensions(kind string, m map[string]string, resourceA
}
}

func (p *processor) upsertPeerAttributes(m []string, peers map[string]string, spanAttr pcommon.Map) {
for _, s := range m {
if v, ok := findAttributeValue(s, spanAttr); ok {
peers[s] = v
}
}
}

func (p *processor) onComplete(e *store.Edge) {
p.logger.Debug(
"edge completed",
Expand Down
3 changes: 3 additions & 0 deletions processor/servicegraphprocessor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,9 @@ func sampleTraces() ptrace.Traces {
clientSpan.SetStartTimestamp(pcommon.NewTimestampFromTime(tStart))
clientSpan.SetEndTimestamp(pcommon.NewTimestampFromTime(tEnd))
clientSpan.Attributes().PutStr("some-attribute", "val") // Attribute selected as dimension for metrics
clientSpan.Attributes().PutStr(semconv.AttributeNetPeerIP, "127.0.0.99")
clientSpan.Attributes().PutStr(semconv.AttributeHTTPURL, "https://www.foo.bar/search?q=OpenTelemetry#SemConv")
clientSpan.Attributes().PutStr(semconv.AttributeRPCService, "myservice.EchoService")

serverSpan := scopeSpans.Spans().AppendEmpty()
serverSpan.SetName("server span")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
receivers:
otlp:
protocols:
grpc:
otlp/processor_metrics: # Dummy receiver for the metrics pipeline
protocols:
grpc:
endpoint: localhost:12345

processors:
servicegraph:
metrics_exporter: prometheus/processor_metrics # Exporter to send metrics to
latency_histogram_buckets: [100us, 1ms, 2ms, 6ms, 10ms, 100ms, 250ms] # Buckets for latency histogram
dimensions: [cluster, namespace] # Additional dimensions (labels) to be added to the metrics extracted from the resource and span attributes
store: # Configuration for the in-memory store
ttl: 60s # Value to wait for an edge to be completed
max_items: 200 # Amount of edges that will be stored in the storeMap

exporters:
prometheus/processor_metrics:
endpoint: "0.0.0.0:8889"
otlp:
endpoint: localhost:4317
logging:

service:
telemetry:
logs:
level: debug
pipelines:
traces:
receivers: [otlp]
processors: [servicegraph]
exporters: [logging]
metrics/processor_metrics:
receivers: [otlp/processor_metrics]
processors: []
exporters: [prometheus/processor_metrics]