@@ -55,35 +55,32 @@ type InfluxDBStore struct {
55
55
}
56
56
57
57
func (in * InfluxDBStore ) Collect (id SpanID , anns ... Annotation ) error {
58
- // Find a span's point, if found it will be rewritten with new annotations(`anns`)
59
- // if not found, a new span's point will be created .
58
+ // Find a span's point, if found it will be rewritten with new given annotations(`anns`)
59
+ // if not found, a new span's point will be write to `in.dbName` .
60
60
p , err := in .findSpanPoint (id )
61
61
if err != nil {
62
62
return err
63
63
}
64
64
65
- // trace_id, span_id & parent_id are set as tags
66
- // because InfluxDB tags are indexed & those values
67
- // are used later on queries.
65
+ // trace_id, span_id & parent_id are mostly used as part of the "where" part on queries so
66
+ // to have performant queries these are set as tags(InfluxDB indexes tags).
68
67
tags := map [string ]string {
69
68
"trace_id" : id .Trace .String (),
70
69
"span_id" : id .Span .String (),
71
70
"parent_id" : id .Parent .String (),
72
71
}
73
72
74
- // Saving annotations as InfluxDB measurement spans fields
75
- // which are not indexed.
73
+ // Annotations `anns` are set as fields(InfluxDB does not index fields).
76
74
fields := make (map [string ]interface {}, len (anns ))
77
75
for _ , ann := range anns {
78
76
fields [ann .Key ] = string (ann .Value )
79
77
}
80
78
81
- if p != nil { // span exists on DB .
79
+ if p != nil { // span exists on `in.dbName` .
82
80
p .Measurement = spanMeasurementName
83
81
p .Tags = tags
84
82
85
- // Using extendFields & withoutEmptyFields in order to have
86
- // pointFields that only contains:
83
+ // Using extendFields & withoutEmptyFields in order to have pointFields that only contains:
87
84
// - Fields that are not saved on DB.
88
85
// - Fields that are saved but have empty values.
89
86
fields := extendFields (fields , withoutEmptyFields (p .Fields ))
@@ -98,18 +95,18 @@ func (in *InfluxDBStore) Collect(id SpanID, anns ...Annotation) error {
98
95
p .Fields = fields
99
96
} else { // new span to be saved on DB.
100
97
101
- // A field `schemasFieldName` contains all the schemas found on `anns`.
98
+ // `schemasFieldName` field contains all the schemas found on `anns`.
102
99
// Eg. fields[schemasFieldName] = "HTTPClient,HTTPServer"
103
100
fields [schemasFieldName ] = schemasFromAnnotations (anns )
104
101
p = & influxDBClient.Point {
105
102
Measurement : spanMeasurementName ,
106
- Tags : tags , // indexed metadata.
107
- Fields : fields , // non-indexed metadata.
103
+ Tags : tags ,
104
+ Fields : fields ,
108
105
Time : time .Now ().UTC (),
109
106
}
110
107
}
111
108
112
- // InfluxDB point represents a single span.
109
+ // A single point represents one span.
113
110
pts := []influxDBClient.Point {* p }
114
111
bps := influxDBClient.BatchPoints {
115
112
Points : pts ,
@@ -138,7 +135,7 @@ func (in *InfluxDBStore) Trace(id ID) (*Trace, error) {
138
135
return nil , errors .New ("trace not found" )
139
136
}
140
137
141
- // Iterate over series(spans) to create trace children's & set trace fields.
138
+ // Iterate over series(spans) to set ` trace` fields.
142
139
var rootSpanSet bool
143
140
for _ , s := range result .Series {
144
141
var isRootSpan bool
@@ -186,7 +183,7 @@ func (in *InfluxDBStore) Traces() ([]*Trace, error) {
186
183
// Cache to keep track of traces to be returned.
187
184
tracesCache := make (map [ID ]* Trace , 0 )
188
185
189
- // Iterate over series(spans) to create traces.
186
+ // Iterate over series(spans) to create root traces.
190
187
for _ , s := range rootSpansResult .Series {
191
188
span , err := newSpanFromRow (& s )
192
189
if err != nil {
@@ -218,15 +215,14 @@ func (in *InfluxDBStore) Traces() ([]*Trace, error) {
218
215
i += 1
219
216
}
220
217
221
- // Queries for all children spans of the traces to be returned .
218
+ // Queries for all children spans of the root traces .
222
219
childrenSpansQuery := fmt .Sprintf ("SELECT * FROM spans %s GROUP BY *" , where )
223
220
childrenSpansResult , err := in .executeOneQuery (childrenSpansQuery )
224
221
if err != nil {
225
222
return nil , err
226
223
}
227
224
228
- // Iterate over series(children spans) to create sub-traces
229
- // and associates sub-traces with it's parent trace.
225
+ // Iterate over series(children spans) to set sub-traces to it's corresponding root trace.
230
226
for _ , s := range childrenSpansResult .Series {
231
227
span , err := newSpanFromRow (& s )
232
228
if err != nil {
@@ -268,7 +264,7 @@ func (in *InfluxDBStore) createDBIfNotExists() error {
268
264
}
269
265
}
270
266
271
- // If no errors query execution was successfully - either DB was created or already exists.
267
+ // If there are no errors, query execution was successfully - either DB was created or already exists.
272
268
response , err := in .con .Query (influxDBClient.Query {Command : q })
273
269
if err != nil {
274
270
return err
@@ -279,8 +275,7 @@ func (in *InfluxDBStore) createDBIfNotExists() error {
279
275
return nil
280
276
}
281
277
282
- // createAdminUserIfNotExists creates an admin user
283
- // using `in.adminUser` credentials if does not exist.
278
+ // createAdminUserIfNotExists finds admin user(`in.adminUser`) if not found it's created.
284
279
func (in * InfluxDBStore ) createAdminUserIfNotExists () error {
285
280
userInfo , err := in .server .MetaClient .Authenticate (in .adminUser .Username , in .adminUser .Password )
286
281
if err == influxDBErrors .ErrUserNotFound {
@@ -291,7 +286,7 @@ func (in *InfluxDBStore) createAdminUserIfNotExists() error {
291
286
} else {
292
287
return err
293
288
}
294
- if ! userInfo .Admin {
289
+ if ! userInfo .Admin { // must be admin user.
295
290
return errors .New ("failed to validate InfluxDB user type, found non-admin user" )
296
291
}
297
292
return nil
@@ -366,6 +361,9 @@ func (in *InfluxDBStore) init(server *influxDBServer.Server) error {
366
361
if err != nil {
367
362
return err
368
363
}
364
+
365
+ // TODO: Upgrade to client v2, see: github.com/influxdata/influxdb/blob/master/client/v2/client.go
366
+ // We're currently using v1.
369
367
con , err := influxDBClient .NewClient (influxDBClient.Config {
370
368
URL : * url ,
371
369
Username : in .adminUser .Username ,
@@ -391,7 +389,8 @@ func (in *InfluxDBStore) init(server *influxDBServer.Server) error {
391
389
if err := in .createDBIfNotExists (); err != nil {
392
390
return err
393
391
}
394
- // TODO: support specifying the number of traces per page.
392
+
393
+ // TODO: let lib users decide `in.tracesPerPage` through InfluxDBStoreConfig.
395
394
in .tracesPerPage = defaultTracesPerPage
396
395
return nil
397
396
}
@@ -423,18 +422,15 @@ func annotationsFromRow(r *influxDBModels.Row) (*Annotations, error) {
423
422
fields = r .Values [0 ]
424
423
}
425
424
426
- // len(r.Values) cannot be greater than 1.
427
- // Values[0] is the slice containing a span's
428
- // annotation values.
425
+ // `len(r.Values)` cannot be greater than 1. `r.Values[0]` is an slice containing annotation values.
429
426
if len (r .Values ) > 1 {
430
427
return nil , errors .New ("unexpected multiple row values" )
431
428
}
432
429
annotations := make (Annotations , 0 )
433
430
434
- // Iterates over fields which represent span's annotation values .
431
+ // Iterates over fields, each field represents an `Annotation.Value` .
435
432
for i , field := range fields {
436
- // It is safe to do column[0] (eg. 'Server.Request.Method')
437
- // matches fields[0] (eg. 'GET')
433
+ // It's safe to do that column[0] (eg. 'Server.Request.Method') matches fields[0] (eg. 'GET').
438
434
key := r .Columns [i ]
439
435
var value []byte
440
436
switch field .(type ) {
@@ -464,42 +460,31 @@ func extendFields(dst, src pointFields) pointFields {
464
460
return dst
465
461
}
466
462
467
- // filterSchemas returns `Annotations` with items taken from `anns`.
468
- // It finds the annotation with key: `schemaFieldName`, which is later use
469
- // to discard schema related annotations not present on it's value.
463
+ // filterSchemas returns `Annotations` which contains items taken from `anns`.
464
+ // Some items from `anns` won't be included(those which were not saved by `InfluxDBStore.Collect(...)`).
470
465
func filterSchemas (anns []Annotation ) Annotations {
471
466
var annotations Annotations
472
467
473
- // Finds the annotation with key `schemasFieldName`.
468
+ // Finds an annotation which: `Annotation.Key` is equal to `schemasFieldName`.
474
469
schemasAnn := findSchemasAnnotation (anns )
475
470
476
- // Convert it to a string slice which contains the schemas .
471
+ // Converts `schemasAnn.Value` into slice of strings, each item is a schema .
477
472
// Eg. schemas := []string{"HTTPClient", "HTTPServer"}
478
473
schemas := strings .Split (string (schemasAnn .Value ), schemasFieldSeparator )
479
474
480
- // Iterate over `anns` to check if each annotation is a schema related one
481
- // if so it's added to the `annotations` be returned, but only if it's present
482
- // on `schemas`.
483
- // If annotation is not schema related, it's added to annotations returned.
475
+ // Iterates over `anns` to check if each annotation should be included or not to the `annotations` be returned.
484
476
for _ , a := range anns {
485
- if strings .HasPrefix (a .Key , schemaPrefix ) {
486
- schema := a .Key [len (schemaPrefix ):]
487
-
488
- // If schema does not exists; annotation `a` is not added to
489
- // the `annotations` be returned because it was not saved
490
- // by `Collect(...)`.
491
- // But exists because InfluxDB returns all fields(annotations)
492
- // even those ones not explicit written by `Collect(...)`.
493
- //
494
- // Eg. if point "a" is written with a field "foo" &
495
- // point "b" with a field "bar" (both "a" & "b" written in the
496
- // same measurement), when querying for those points the result
497
- // will contain two fields "foo" & "bar", even though field "bar"
498
- // was not present when writing Point "a".
499
- if schemaExists (schema , schemas ) {
500
- // Schema exists, meaning `Collect(...)` method
501
- // saved this annotation.
477
+ if strings .HasPrefix (a .Key , schemaPrefix ) { // Check if current annotation is schema related one.
478
+ schema := a .Key [len (schemaPrefix ):] // Excludes the schema prefix part.
479
+
480
+ // Checks if `schema` exists in `schemas`, if so means current annotation was saved by `InfluxDBStore.Collect(...)`.
481
+ // If does not exist it means current annotation is empty on `InfluxDBStore.dbName` but still included within a query result.
482
+ // Eg. If point "f" with a field "foo" & point "b" with a field "bar" are written to the same InfluxDB measurement
483
+ // and later queried, the result will include two fields: "foo" & "bar" for both points, even though each was written with one field.
484
+ if schemaExists (schema , schemas ) { // Saved by `InfluxDBStore.Collect(...)` so should be added.
502
485
annotations = append (annotations , a )
486
+ } else { // Do not add current annotation, is empty & not saved by `InfluxDBStore.Collect(...)`.
487
+ continue
503
488
}
504
489
} else {
505
490
// Not a schema related annotation so just add it.
@@ -519,8 +504,7 @@ func schemaExists(schema string, schemas []string) bool {
519
504
return false
520
505
}
521
506
522
- // findSchemasAnnotation finds & returns an annotation
523
- // with key: `schemasFieldName`.
507
+ // findSchemasAnnotation finds & returns an annotation which: `Annotation.Key` is equal to `schemasFieldName`.
524
508
func findSchemasAnnotation (anns []Annotation ) * Annotation {
525
509
for _ , a := range anns {
526
510
if a .Key == schemasFieldName {
@@ -530,35 +514,31 @@ func findSchemasAnnotation(anns []Annotation) *Annotation {
530
514
return nil
531
515
}
532
516
533
- // mergeSchemasField merges new and old which are a set of schemas(strings)
534
- // separated by `schemasFieldSeparator` - eg. "HTTPClient,HTTPServer"
517
+ // mergeSchemasField merges new and old which are a set of schemas(strings) separated by `schemasFieldSeparator`.
535
518
// Returns the result of merging new & old without duplications.
536
519
func mergeSchemasField (new , old interface {}) (string , error ) {
537
- // Since both new and old are same data structures
538
- // (a set of strings separated by `schemasFieldSeparator`)
539
- // same code logic is applied.
520
+ // Since new and old have the same data structures(a set of strings separated by `schemasFieldSeparator`).
521
+ // So same logic is applied to both.
540
522
fields := []interface {}{new , old }
541
523
var strFields []string
542
524
543
- // Iterate over fields in order to cast each to string type
544
- // and append it to `strFields` for later usage.
525
+ // Iterates over fields to convert each into a string and appends it to `strFields` for later usage.
545
526
for _ , field := range fields {
546
527
switch field .(type ) {
547
528
case string :
548
529
strFields = append (strFields , field .(string ))
549
530
case nil :
550
531
continue
551
532
default :
552
- return "" , fmt .Errorf ("unexpected event field type: %v" , reflect .TypeOf (field ))
533
+ return "" , fmt .Errorf ("unexpected schema field type: %v" , reflect .TypeOf (field ))
553
534
}
554
535
}
555
536
556
- // Cache for schemas; used to keep track of non duplicated schemas
557
- // to be returned.
537
+ // Schemas cache, used to keep track schemas to be returned(without duplications).
558
538
schemas := make (map [string ]string , 0 )
559
539
560
- // Iterate over `strFields` to transform each to a slice([]string)
561
- // which each element is an schema that are added to schemas cache .
540
+ // Iterates over `strFields` to convert each into a slice([]string), then iterates over it in order to
541
+ // add each to `schemas` if not present already .
562
542
for _ , strField := range strFields {
563
543
if strField == "" {
564
544
continue
@@ -576,25 +556,25 @@ func mergeSchemasField(new, old interface{}) (string, error) {
576
556
result = append (result , k )
577
557
}
578
558
579
- // Return a string which contains all the schemas separated by `schemasFieldSeparator`.
559
+ // Returns a string which contains all the schemas separated by `schemasFieldSeparator`.
580
560
return strings .Join (result , schemasFieldSeparator ), nil
581
561
}
582
562
583
- // schemasFromAnnotations finds schemas in `anns` and builds a data structure
584
- // which is a set of all schemas found, those are separated by `schemasFieldSeparator`
585
- // and returned as string.
563
+ // schemasFromAnnotations returns a string(a set of schemas(strings) separated by `schemasFieldSeparator`) - eg. "HTTPClient,HTTPServer,name".
564
+ // Each schema is extracted from each `Annotation.Key` from `anns`.
586
565
func schemasFromAnnotations (anns []Annotation ) string {
587
566
var schemas []string
588
567
for _ , ann := range anns {
589
- if strings .HasPrefix (ann .Key , schemaPrefix ) { // Check if is an annotation for a schema.
568
+
569
+ // Checks if current annotation is schema related.
570
+ if strings .HasPrefix (ann .Key , schemaPrefix ) {
590
571
schemas = append (schemas , ann .Key [len (schemaPrefix ):])
591
572
}
592
573
}
593
574
return strings .Join (schemas , schemasFieldSeparator )
594
575
}
595
576
596
- // withoutEmptyFields returns a pointFields without
597
- // those fields that has empty values.
577
+ // withoutEmptyFields filters `pf` and returns `pointFields` excluding those that have empty values.
598
578
func withoutEmptyFields (pf pointFields ) pointFields {
599
579
r := make (pointFields , 0 )
600
580
for k , v := range pf {
0 commit comments