@@ -24,6 +24,9 @@ var _ interface {
24
24
Queryer
25
25
} = (* InfluxDBStore )(nil )
26
26
27
+ // TODO: should be a constant.
28
+ var zeroID = fmt .Sprintf ("%016x" , uint64 (0 ))
29
+
27
30
type InfluxDBStore struct {
28
31
con * influxDBClient.Client // InfluxDB client connection.
29
32
server * influxDBServer.Server // InfluxDB API server.
@@ -117,20 +120,19 @@ func (in *InfluxDBStore) Traces() ([]*Trace, error) {
117
120
traces := make ([]* Trace , 0 )
118
121
// GROUP BY * -> meaning group by all tags(trace_id, span_id & parent_id)
119
122
// grouping by all tags includes those and it's values on the query response.
120
- q := fmt .Sprintf ("SELECT * FROM spans GROUP BY * LIMIT %d" , in .tracesPerPage )
121
- result , err := in .executeOneQuery (q )
123
+ rootSpansQuery := fmt .Sprintf ("SELECT * FROM spans WHERE parent_id='%s' GROUP BY * LIMIT %d" , zeroID , in .tracesPerPage )
124
+ rootSpansResult , err := in .executeOneQuery (rootSpansQuery )
122
125
if err != nil {
123
126
return nil , err
124
127
}
125
128
// result.Series -> A slice containing all the spans.
126
- if len (result .Series ) == 0 {
129
+ if len (rootSpansResult .Series ) == 0 {
127
130
return traces , nil
128
131
}
129
132
// Cache to keep track of traces to be returned.
130
133
tracesCache := make (map [ID ]* Trace , 0 )
131
134
// Iterate over series(spans) to create traces.
132
- for _ , s := range result .Series {
133
- var isRootSpan bool
135
+ for _ , s := range rootSpansResult .Series {
134
136
span , err := newSpanFromRow (& s )
135
137
if err != nil {
136
138
return nil , err
@@ -139,28 +141,52 @@ func (in *InfluxDBStore) Traces() ([]*Trace, error) {
139
141
if err != nil {
140
142
return nil , err
141
143
}
142
- if span .ID .Parent == 0 {
143
- isRootSpan = true
144
+ span .Annotations = * annotations
145
+ _ , present := tracesCache [span .ID .Trace ]
146
+ if ! present {
147
+ tracesCache [span .ID .Trace ] = & Trace {Span : * span }
148
+ } else {
149
+ return nil , errors .New ("duplicated root span" )
150
+ }
151
+ }
152
+ // Using 'OR' since 'IN' not supported yet.
153
+ where := `WHERE `
154
+ var i int = 1
155
+ for _ , trace := range tracesCache {
156
+ where += fmt .Sprintf ("(trace_id='%s' AND parent_id!='%s')" , trace .Span .ID .Trace , zeroID )
157
+ // Adds 'OR' except for last iteration.
158
+ if i != len (tracesCache ) && len (tracesCache ) > 1 {
159
+ where += " OR "
160
+ }
161
+ i += 1
162
+ }
163
+ // Queries for all children spans of the traces to be returned.
164
+ childrenSpansQuery := fmt .Sprintf ("SELECT * FROM spans %s GROUP BY *" , where )
165
+ childreSpansResult , err := in .executeOneQuery (childrenSpansQuery )
166
+ if err != nil {
167
+ return nil , err
168
+ }
169
+ // Iterate over series(children spans) to create sub-traces
170
+ // and associates sub-traces with it's parent trace.
171
+ for _ , s := range childreSpansResult .Series {
172
+ span , err := newSpanFromRow (& s )
173
+ if err != nil {
174
+ return nil , err
175
+ }
176
+ annotations , err := annotationsFromRow (& s )
177
+ if err != nil {
178
+ return nil , err
144
179
}
145
180
span .Annotations = * annotations
146
- if isRootSpan { // root span.
147
- trace , present := tracesCache [span .ID .Trace ]
148
- if ! present {
149
- tracesCache [span .ID .Trace ] = & Trace {Span : * span }
150
- } else { // trace already added just update the span.
151
- trace .Span = * span
152
- }
153
- } else { // children span.
154
- trace , present := tracesCache [span .ID .Trace ]
155
- if ! present { // root trace not added yet.
156
- tracesCache [span .ID .Trace ] = & Trace {Sub : []* Trace {& Trace {Span : * span }}}
157
- } else { // root trace already added so append a sub trace.
158
- trace .Sub = append (trace .Sub , & Trace {Span : * span })
159
- }
181
+ trace , present := tracesCache [span .ID .Trace ]
182
+ if ! present { // Root trace not added.
183
+ return nil , errors .New ("parent not found" )
184
+ } else { // Root trace already added so append a sub-trace.
185
+ trace .Sub = append (trace .Sub , & Trace {Span : * span })
160
186
}
161
187
}
162
- for _ , t := range tracesCache {
163
- traces = append (traces , t )
188
+ for _ , trace := range tracesCache {
189
+ traces = append (traces , trace )
164
190
}
165
191
return traces , nil
166
192
}
0 commit comments