@@ -40,6 +40,7 @@ func (in *InfluxDBStore) Collect(id SpanID, anns ...Annotation) error {
40
40
if err := in .removeSpanIfExists (id ); err != nil {
41
41
return err
42
42
}
43
+
43
44
// trace_id, span_id & parent_id are set as tags
44
45
// because InfluxDB tags are indexed & those values
45
46
// are uselater on queries.
@@ -48,12 +49,14 @@ func (in *InfluxDBStore) Collect(id SpanID, anns ...Annotation) error {
48
49
"span_id" : id .Span .String (),
49
50
"parent_id" : id .Parent .String (),
50
51
}
52
+
51
53
// Saving annotations as InfluxDB measurement spans fields
52
54
// which are not indexed.
53
55
fields := make (map [string ]interface {}, len (anns ))
54
56
for _ , ann := range anns {
55
57
fields [ann .Key ] = string (ann .Value )
56
58
}
59
+
57
60
// InfluxDB point represents a single span.
58
61
pts := []influxDBClient.Point {
59
62
influxDBClient.Point {
@@ -77,18 +80,21 @@ func (in *InfluxDBStore) Collect(id SpanID, anns ...Annotation) error {
77
80
78
81
func (in * InfluxDBStore ) Trace (id ID ) (* Trace , error ) {
79
82
trace := & Trace {}
83
+
80
84
// GROUP BY * -> meaning group by all tags(trace_id, span_id & parent_id)
81
85
// grouping by all tags includes those and it's values on the query response.
82
86
q := fmt .Sprintf ("SELECT * FROM spans WHERE trace_id='%s' GROUP BY *" , id )
83
87
result , err := in .executeOneQuery (q )
84
88
if err != nil {
85
89
return nil , err
86
90
}
91
+
87
92
// result.Series -> A slice containing all the spans.
88
93
if len (result .Series ) == 0 {
89
94
return nil , errors .New ("trace not found" )
90
95
}
91
96
var isRootSpan bool
97
+
92
98
// Iterate over series(spans) to create trace children's & set trace fields.
93
99
for _ , s := range result .Series {
94
100
span , err := newSpanFromRow (& s )
@@ -118,19 +124,23 @@ func (in *InfluxDBStore) Trace(id ID) (*Trace, error) {
118
124
119
125
func (in * InfluxDBStore ) Traces () ([]* Trace , error ) {
120
126
traces := make ([]* Trace , 0 )
127
+
121
128
// GROUP BY * -> meaning group by all tags(trace_id, span_id & parent_id)
122
129
// grouping by all tags includes those and it's values on the query response.
123
130
rootSpansQuery := fmt .Sprintf ("SELECT * FROM spans WHERE parent_id='%s' GROUP BY * LIMIT %d" , zeroID , in .tracesPerPage )
124
131
rootSpansResult , err := in .executeOneQuery (rootSpansQuery )
125
132
if err != nil {
126
133
return nil , err
127
134
}
135
+
128
136
// result.Series -> A slice containing all the spans.
129
137
if len (rootSpansResult .Series ) == 0 {
130
138
return traces , nil
131
139
}
140
+
132
141
// Cache to keep track of traces to be returned.
133
142
tracesCache := make (map [ID ]* Trace , 0 )
143
+
134
144
// Iterate over series(spans) to create traces.
135
145
for _ , s := range rootSpansResult .Series {
136
146
span , err := newSpanFromRow (& s )
@@ -149,23 +159,27 @@ func (in *InfluxDBStore) Traces() ([]*Trace, error) {
149
159
return nil , errors .New ("duplicated root span" )
150
160
}
151
161
}
162
+
152
163
// Using 'OR' since 'IN' not supported yet.
153
164
where := `WHERE `
154
165
var i int = 1
155
166
for _ , trace := range tracesCache {
156
167
where += fmt .Sprintf ("(trace_id='%s' AND parent_id!='%s')" , trace .Span .ID .Trace , zeroID )
168
+
157
169
// Adds 'OR' except for last iteration.
158
170
if i != len (tracesCache ) && len (tracesCache ) > 1 {
159
171
where += " OR "
160
172
}
161
173
i += 1
162
174
}
175
+
163
176
// Queries for all children spans of the traces to be returned.
164
177
childrenSpansQuery := fmt .Sprintf ("SELECT * FROM spans %s GROUP BY *" , where )
165
178
childrenSpansResult , err := in .executeOneQuery (childrenSpansQuery )
166
179
if err != nil {
167
180
return nil , err
168
181
}
182
+
169
183
// Iterate over series(children spans) to create sub-traces
170
184
// and associates sub-traces with it's parent trace.
171
185
for _ , s := range childrenSpansResult .Series {
@@ -220,6 +234,7 @@ func (in *InfluxDBStore) executeOneQuery(command string) (*influxDBClient.Result
220
234
if response .Error () != nil {
221
235
return nil , response .Error ()
222
236
}
237
+
223
238
// Expecting one result, since a single query is executed.
224
239
if len (response .Results ) != 1 {
225
240
return nil , errors .New ("unexpected number of results for an influxdb single query" )
@@ -263,13 +278,15 @@ func annotationsFromRow(r *influxDBModels.Row) (*Annotations, error) {
263
278
if len (r .Values ) == 1 {
264
279
fields = r .Values [0 ]
265
280
}
281
+
266
282
// len(r.Values) might be greater than one - meaning there are
267
283
// some spans to drop, see: InfluxDBStore.Collect(...).
268
284
// If so last one is picked.
269
285
if len (r .Values ) > 1 {
270
286
fields = r .Values [len (r .Values )- 1 ]
271
287
}
272
288
annotations := make (Annotations , len (fields ))
289
+
273
290
// Iterates over fields which represent span's annotation values.
274
291
for i , field := range fields {
275
292
// It is safe to do column[0] (eg. 'Server.Request.Method')
0 commit comments