-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathcollector.go
88 lines (72 loc) · 2.2 KB
/
collector.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
package zipkin
import (
"encoding/base64"
"fmt"
"git.apache.org/thrift.git/lib/go/thrift"
"github.com/mattkanwisher/distributedtrace/gen/scribe"
zkcore "github.com/mattkanwisher/distributedtrace/gen/zipkincore"
zkdep "github.com/mattkanwisher/distributedtrace/gen/zipkindependencies"
)
type Collector struct {
*Config
buffer chan *zkcore.Span
}
func NewCollector(config *Config) *Collector {
return &Collector{
Config: config,
buffer: make(chan *zkcore.Span),
}
}
func (c *Collector) Receive() <-chan *zkcore.Span {
return c.buffer
}
func (c *Collector) Log(entries []*scribe.LogEntry) (code scribe.ResultCode, e error) {
spans := make([]*zkcore.Span, 0, len(entries))
for _, entry := range entries {
if span, e := spanFromEntry(entry); e != nil {
return scribe.ResultCode_TRY_LATER, e
} else {
spans = append(spans, span)
}
}
c.Logger.Printf("Log(): %d span(s) received.", len(spans))
for _, span := range spans {
if span.ParentId != nil {
c.Logger.Printf("span: t:%d i:%d p:%d n:%s", span.TraceId, span.Id, *span.ParentId, span.Name)
} else {
c.Logger.Printf("span: t:%d i:%d p:%d n:%s", span.TraceId, span.Id, 0, span.Name)
}
c.buffer <- span
}
return scribe.ResultCode_OK, nil
}
func (c *Collector) StoreDependencies(dependencies *zkdep.Dependencies) error {
c.Logger.Print("StoreDependencies()")
return nil
}
func (c *Collector) StoreTopAnnotations(serviceName string, annotations []string) error {
c.Logger.Print("StoreTopAnnotations()")
return nil
}
func (c *Collector) StoreTopKeyValueAnnotations(serviceName string, annotations []string) error {
c.Logger.Print("StoreTopKeyValueAnnotations()")
return nil
}
func spanFromEntry(entry *scribe.LogEntry) (span *zkcore.Span, e error) {
var buffer []byte // TODO: Reuse buffer
if buffer, e = base64.StdEncoding.DecodeString(entry.Message); e != nil {
return nil, e
}
thriftBuffer := thrift.NewTMemoryBuffer()
if n, e := thriftBuffer.Write(buffer); e != nil {
return nil, e
} else if n != len(buffer) {
return nil, fmt.Errorf("buffer copy failure.")
}
protocol := thrift.NewTBinaryProtocol(thriftBuffer, true, true)
span = &zkcore.Span{}
if e := span.Read(protocol); e != nil {
return nil, e
}
return span, nil
}