From 981ffe3d238beed3884d5bcbb5bbaa2e2fb3e961 Mon Sep 17 00:00:00 2001 From: Murphy Chen Date: Tue, 15 Nov 2022 10:50:48 +0800 Subject: [PATCH 1/2] Fix servcegraph (#6) * fix * fix * update --- .chloggen/service_graph_fix.yaml | 2 +- processor/servicegraphprocessor/processor.go | 6 +-- .../servicegraphprocessor/processor_test.go | 52 +++++++++++++++---- 3 files changed, 44 insertions(+), 16 deletions(-) diff --git a/.chloggen/service_graph_fix.yaml b/.chloggen/service_graph_fix.yaml index c34954a2b7c19..a7e6e3b3d4162 100644 --- a/.chloggen/service_graph_fix.yaml +++ b/.chloggen/service_graph_fix.yaml @@ -5,7 +5,7 @@ change_type: component: servicegraphprocessor # A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). -note: fix servicegraph failed to find dimensions. +note: Fix servicegraph failed to find dimensions. # One or more tracking issues related to the change issues: [] diff --git a/processor/servicegraphprocessor/processor.go b/processor/servicegraphprocessor/processor.go index c4eb4d4a5ad30..c21a8b109aff5 100644 --- a/processor/servicegraphprocessor/processor.go +++ b/processor/servicegraphprocessor/processor.go @@ -329,7 +329,7 @@ func buildDimensions(e *store.Edge) pcommon.Map { dims.PutStr("client", e.ClientService) dims.PutStr("server", e.ServerService) dims.PutStr("connection_type", string(e.ConnectionType)) - dims.PutBool("failed", e.Failed) + //dims.PutBool("failed", e.Failed) for k, v := range e.Dimensions { dims.PutStr(k, v) } @@ -374,8 +374,7 @@ func (p *processor) collectCountMetrics(ilm pmetric.ScopeMetrics) error { dpCalls.SetIntValue(value) series.dimensions.CopyTo(dpCalls.Attributes()) - if v, _ := series.dimensions.Get("failed"); v.Equal(pcommon.NewValueBool(true)) { - value, _ = p.reqFailedTotal[key] + if value, ok = p.reqFailedTotal[key]; ok { mCount = ilm.Metrics().AppendEmpty() mCount.SetName("traces_service_graph_request_failed_total") mCount.SetEmptySum().SetIsMonotonic(true) @@ -417,7 +416,6 @@ func (p *processor) collectLatencyMetrics(ilm pmetric.ScopeMetrics) error { // TODO: Support exemplars series.dimensions.CopyTo(dpDuration.Attributes()) - } return nil diff --git a/processor/servicegraphprocessor/processor_test.go b/processor/servicegraphprocessor/processor_test.go index f0846972aadb5..00da463863531 100644 --- a/processor/servicegraphprocessor/processor_test.go +++ b/processor/servicegraphprocessor/processor_test.go @@ -104,6 +104,10 @@ func TestProcessorConsume(t *testing.T) { cfg := &Config{ MetricsExporter: "mock", Dimensions: []string{"some-attribute", "non-existing-attribute"}, + Store: StoreConfig{ + MaxItems: 10, + TTL: time.Minute, + }, } mockMetricsExporter := newMockMetricsExporter(func(md pmetric.Metrics) error { @@ -134,7 +138,7 @@ func TestProcessorConsume(t *testing.T) { } func verifyMetrics(t *testing.T, md pmetric.Metrics) error { - assert.Equal(t, 2, md.MetricCount()) + assert.Equal(t, 3, md.MetricCount()) rms := md.ResourceMetrics() assert.Equal(t, 1, rms.Len()) @@ -143,12 +147,12 @@ func verifyMetrics(t *testing.T, md pmetric.Metrics) error { assert.Equal(t, 1, sms.Len()) ms := sms.At(0).Metrics() - assert.Equal(t, 2, ms.Len()) + assert.Equal(t, 3, ms.Len()) mCount := ms.At(0) verifyCount(t, mCount) - mDuration := ms.At(1) + mDuration := ms.At(2) verifyDuration(t, mDuration) return nil @@ -163,14 +167,14 @@ func verifyCount(t *testing.T, m pmetric.Metric) { dp := dps.At(0) assert.Equal(t, pmetric.NumberDataPointValueTypeInt, dp.ValueType()) - assert.Equal(t, int64(1), dp.IntValue()) + assert.Equal(t, int64(2), dp.IntValue()) attributes := dp.Attributes() assert.Equal(t, 4, attributes.Len()) verifyAttr(t, attributes, "client", "some-service") verifyAttr(t, attributes, "server", "some-service") - verifyAttr(t, attributes, "failed", "false") - verifyAttr(t, attributes, "some-attribute", "val") + verifyAttr(t, attributes, "connection_type", "") + verifyAttr(t, attributes, "client_some-attribute", "val") } func verifyDuration(t *testing.T, m pmetric.Metric) { @@ -181,16 +185,18 @@ func verifyDuration(t *testing.T, m pmetric.Metric) { assert.Equal(t, 1, dps.Len()) dp := dps.At(0) - assert.Equal(t, float64(1000), dp.Sum()) // Duration: 1sec - assert.Equal(t, uint64(1), dp.Count()) - assert.Equal(t, []uint64{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0}, dp.BucketCounts()) + assert.Equal(t, float64(2000), dp.Sum()) // Duration: 1sec + assert.Equal(t, uint64(2), dp.Count()) + except := pcommon.NewUInt64Slice() + except.FromRaw([]uint64{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0}) + assert.Equal(t, except, dp.BucketCounts()) attributes := dp.Attributes() assert.Equal(t, 4, attributes.Len()) verifyAttr(t, attributes, "client", "some-service") verifyAttr(t, attributes, "server", "some-service") - verifyAttr(t, attributes, "failed", "false") - verifyAttr(t, attributes, "some-attribute", "val") + verifyAttr(t, attributes, "connection_type", "") + verifyAttr(t, attributes, "client_some-attribute", "val") } func verifyAttr(t *testing.T, attrs pcommon.Map, k, expected string) { @@ -213,6 +219,7 @@ func sampleTraces() ptrace.Traces { traceID := pcommon.TraceID([16]byte{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10}) clientSpanID := pcommon.SpanID([8]byte{0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18}) + //span one error clientSpan := scopeSpans.Spans().AppendEmpty() clientSpan.SetName("client span") clientSpan.SetSpanID(clientSpanID) @@ -230,6 +237,29 @@ func sampleTraces() ptrace.Traces { serverSpan.SetKind(ptrace.SpanKindServer) serverSpan.SetStartTimestamp(pcommon.NewTimestampFromTime(tStart)) serverSpan.SetEndTimestamp(pcommon.NewTimestampFromTime(tEnd)) + serverSpan.Status().SetCode(ptrace.StatusCodeError) + + traceID = [16]byte{0x02, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10} + clientSpanID = [8]byte{0x12, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18} + + //span two right + clientSpan = scopeSpans.Spans().AppendEmpty() + clientSpan.SetName("client span") + clientSpan.SetSpanID(clientSpanID) + clientSpan.SetTraceID(traceID) + clientSpan.SetKind(ptrace.SpanKindClient) + clientSpan.SetStartTimestamp(pcommon.NewTimestampFromTime(tStart)) + clientSpan.SetEndTimestamp(pcommon.NewTimestampFromTime(tEnd)) + clientSpan.Attributes().PutStr("some-attribute", "val") // Attribute selected as dimension for metrics + + serverSpan = scopeSpans.Spans().AppendEmpty() + serverSpan.SetName("server span") + serverSpan.SetSpanID([8]byte{0x20, 0x20, 0x21, 0x22, 0x23, 0x24, 0x25, 0x26}) + serverSpan.SetTraceID(traceID) + serverSpan.SetParentSpanID(clientSpanID) + serverSpan.SetKind(ptrace.SpanKindServer) + serverSpan.SetStartTimestamp(pcommon.NewTimestampFromTime(tStart)) + serverSpan.SetEndTimestamp(pcommon.NewTimestampFromTime(tEnd)) return traces } From bd57ed0a93c511c18db0cbac34221ffe1c21dab7 Mon Sep 17 00:00:00 2001 From: Jared Tan Date: Thu, 1 Dec 2022 15:11:07 +0800 Subject: [PATCH 2/2] enhancement service graph with virtual node. Signed-off-by: Jared Tan --- .../internal/store/edge.go | 7 ++++ .../internal/store/store.go | 42 ++++++++++++++++++- .../internal/store/store_test.go | 3 +- processor/servicegraphprocessor/processor.go | 9 ++++ .../servicegraphprocessor/processor_test.go | 3 ++ .../testdata/service-graph-config-debug.yaml | 38 +++++++++++++++++ 6 files changed, 100 insertions(+), 2 deletions(-) create mode 100644 processor/servicegraphprocessor/testdata/service-graph-config-debug.yaml diff --git a/processor/servicegraphprocessor/internal/store/edge.go b/processor/servicegraphprocessor/internal/store/edge.go index 584e566d33433..3945c64ee28f2 100644 --- a/processor/servicegraphprocessor/internal/store/edge.go +++ b/processor/servicegraphprocessor/internal/store/edge.go @@ -28,6 +28,10 @@ const ( Database ConnectionType = "database" ) +type Peer struct { + RpcAttributes map[string]string +} + // Edge is an Edge between two nodes in the graph type Edge struct { key Key @@ -46,6 +50,8 @@ type Edge struct { // expiration is the time at which the Edge expires, expressed as Unix time expiration time.Time + + Peer Peer } func newEdge(key Key, ttl time.Duration) *Edge { @@ -53,6 +59,7 @@ func newEdge(key Key, ttl time.Duration) *Edge { key: key, Dimensions: make(map[string]string), expiration: time.Now().Add(ttl), + Peer: Peer{RpcAttributes: make(map[string]string)}, } } diff --git a/processor/servicegraphprocessor/internal/store/store.go b/processor/servicegraphprocessor/internal/store/store.go index 84303a81a983b..1e34b7f599154 100644 --- a/processor/servicegraphprocessor/internal/store/store.go +++ b/processor/servicegraphprocessor/internal/store/store.go @@ -17,6 +17,7 @@ package store // import "github.com/open-telemetry/opentelemetry-collector-contr import ( "container/list" "errors" + semconv "go.opentelemetry.io/collector/semconv/v1.13.0" "sync" "time" @@ -25,6 +26,8 @@ import ( var ( ErrTooManyItems = errors.New("too many items") + // NeedToFindAttributes the list of attributes need to matches, the higher the front, the higher the priority. + NeedToFindAttributes = []string{semconv.AttributeNetSockHostAddr, semconv.AttributeRPCService, semconv.AttributeHTTPURL, semconv.AttributeHTTPTarget, semconv.AttributeNetPeerName, semconv.AttributeNetHostName} ) type Callback func(e *Edge) @@ -119,10 +122,47 @@ func (s *Store) Expire() { defer s.mtx.Unlock() // Iterates until no more items can be evicted - for s.tryEvictHead() { + for s.trySpeculateEvictHead() { + s.tryEvictHead() } } +// speculate virtual node before edge get expired. +func (s *Store) trySpeculateEvictHead() bool { + head := s.l.Front() + if head == nil { + return false // list is empty + } + headEdge := head.Value.(*Edge) + if !headEdge.isExpired() { + return false + } + + if len(headEdge.ClientService) == 0 { + headEdge.ClientService = "user" + } + + if len(headEdge.ServerService) == 0 { + headEdge.ServerService = s.getPeerHost(NeedToFindAttributes, headEdge.Peer.RpcAttributes) + } + + if headEdge.isComplete() { + s.onComplete(headEdge) + } + return true +} + +func (s *Store) getPeerHost(m []string, peers map[string]string) string { + peerStr := "unknown" + for _, s := range m { + if len(peers[s]) != 0 { + peerStr = peers[s] + break + } + } + return peerStr +} + // tryEvictHead checks if the oldest item (head of list) can be evicted and will delete it if so. // Returns true if the head was evicted. // diff --git a/processor/servicegraphprocessor/internal/store/store_test.go b/processor/servicegraphprocessor/internal/store/store_test.go index 6e6306f7ca399..002bf41a25cf7 100644 --- a/processor/servicegraphprocessor/internal/store/store_test.go +++ b/processor/servicegraphprocessor/internal/store/store_test.go @@ -136,7 +136,8 @@ func TestStoreExpire(t *testing.T) { s.Expire() assert.Equal(t, 0, s.len()) - assert.Equal(t, 0, onCompletedCount) + // callball onComplete before edge expired. + assert.Equal(t, 100, onCompletedCount) assert.Equal(t, testSize, onExpireCount) } diff --git a/processor/servicegraphprocessor/processor.go b/processor/servicegraphprocessor/processor.go index c21a8b109aff5..63dd311b41278 100644 --- a/processor/servicegraphprocessor/processor.go +++ b/processor/servicegraphprocessor/processor.go @@ -206,6 +206,7 @@ func (p *processor) aggregateMetrics(ctx context.Context, td ptrace.Traces) (err e.ClientLatencySec = float64(span.EndTimestamp()-span.StartTimestamp()) / float64(time.Millisecond.Nanoseconds()) e.Failed = e.Failed || span.Status().Code() == ptrace.StatusCodeError p.upsertDimensions(clientKind, e.Dimensions, rAttributes, span.Attributes()) + p.upsertPeerAttributes(store.NeedToFindAttributes, e.Peer.RpcAttributes, span.Attributes()) // A database request will only have one span, we don't wait for the server // span but just copy details from the client span @@ -263,6 +264,14 @@ func (p *processor) upsertDimensions(kind string, m map[string]string, resourceA } } +func (p *processor) upsertPeerAttributes(m []string, peers map[string]string, spanAttr pcommon.Map) { + for _, s := range m { + if v, ok := findAttributeValue(s, spanAttr); ok { + peers[s] = v + } + } +} + func (p *processor) onComplete(e *store.Edge) { p.logger.Debug( "edge completed", diff --git a/processor/servicegraphprocessor/processor_test.go b/processor/servicegraphprocessor/processor_test.go index 00da463863531..5803d07840028 100644 --- a/processor/servicegraphprocessor/processor_test.go +++ b/processor/servicegraphprocessor/processor_test.go @@ -228,6 +228,9 @@ func sampleTraces() ptrace.Traces { clientSpan.SetStartTimestamp(pcommon.NewTimestampFromTime(tStart)) clientSpan.SetEndTimestamp(pcommon.NewTimestampFromTime(tEnd)) clientSpan.Attributes().PutStr("some-attribute", "val") // Attribute selected as dimension for metrics + clientSpan.Attributes().PutStr(semconv.AttributeNetPeerIP, "127.0.0.99") + clientSpan.Attributes().PutStr(semconv.AttributeHTTPURL, "https://www.foo.bar/search?q=OpenTelemetry#SemConv") + clientSpan.Attributes().PutStr(semconv.AttributeRPCService, "myservice.EchoService") serverSpan := scopeSpans.Spans().AppendEmpty() serverSpan.SetName("server span") diff --git a/processor/servicegraphprocessor/testdata/service-graph-config-debug.yaml b/processor/servicegraphprocessor/testdata/service-graph-config-debug.yaml new file mode 100644 index 0000000000000..7f1cb2e5bd07b --- /dev/null +++ b/processor/servicegraphprocessor/testdata/service-graph-config-debug.yaml @@ -0,0 +1,38 @@ +receivers: + otlp: + protocols: + grpc: + otlp/processor_metrics: # Dummy receiver for the metrics pipeline + protocols: + grpc: + endpoint: localhost:12345 + +processors: + servicegraph: + metrics_exporter: prometheus/processor_metrics # Exporter to send metrics to + latency_histogram_buckets: [100us, 1ms, 2ms, 6ms, 10ms, 100ms, 250ms] # Buckets for latency histogram + dimensions: [cluster, namespace] # Additional dimensions (labels) to be added to the metrics extracted from the resource and span attributes + store: # Configuration for the in-memory store + ttl: 60s # Value to wait for an edge to be completed + max_items: 200 # Amount of edges that will be stored in the storeMap + +exporters: + prometheus/processor_metrics: + endpoint: "0.0.0.0:8889" + otlp: + endpoint: localhost:4317 + logging: + +service: + telemetry: + logs: + level: debug + pipelines: + traces: + receivers: [otlp] + processors: [servicegraph] + exporters: [logging] + metrics/processor_metrics: + receivers: [otlp/processor_metrics] + processors: [] + exporters: [prometheus/processor_metrics] \ No newline at end of file