Skip to content

Commit 6d10ff7

Browse files
committed
adds support to save schemas field to spans measurement
to keep track which schemas were saved by `Collect(...)`
1 parent c4a97ba commit 6d10ff7

File tree

2 files changed

+180
-6
lines changed

2 files changed

+180
-6
lines changed

influxdb_store.go

+141-6
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"fmt"
66
"net/url"
77
"reflect"
8+
"strings"
89
"time"
910

1011
influxDBClient "github.com/influxdb/influxdb/client"
@@ -13,9 +14,11 @@ import (
1314
)
1415

1516
const (
16-
dbName string = "appdash" // InfluxDB db name.
17-
spanMeasurementName string = "spans" // InfluxDB container name for trace spans.
18-
defaultTracesPerPage int = 10 // Default number of traces per page.
17+
dbName string = "appdash" // InfluxDB db name.
18+
defaultTracesPerPage int = 10 // Default number of traces per page.
19+
schemasFieldName string = "schemas" // Span's measurement field name for schemas field.
20+
schemasFieldSeparator string = "," // Span's measurement character separator for schemas field.
21+
spanMeasurementName string = "spans" // InfluxDB container name for trace spans.
1922
)
2023

2124
// Compile-time "implements" check.
@@ -67,8 +70,15 @@ func (in *InfluxDBStore) Collect(id SpanID, anns ...Annotation) error {
6770
// pointFields that only contains:
6871
// - Fields that are not saved on DB.
6972
// - Fields that are saved but have empty values.
70-
p.Fields = extendFields(fields, withoutEmptyFields(p.Fields))
73+
fields := extendFields(fields, withoutEmptyFields(p.Fields))
74+
schemas, err := mergeSchemasField(schemasFromAnnotations(anns), p.Fields[schemasFieldName])
75+
if err != nil {
76+
return err
77+
}
78+
fields[schemasFieldName] = schemas
79+
p.Fields = fields
7180
} else { // new span to be saved on DB.
81+
fields[schemasFieldName] = schemasFromAnnotations(anns)
7282
p = &influxDBClient.Point{
7383
Measurement: spanMeasurementName,
7484
Tags: tags, // indexed metadata.
@@ -119,7 +129,7 @@ func (in *InfluxDBStore) Trace(id ID) (*Trace, error) {
119129
if err != nil {
120130
return trace, nil
121131
}
122-
span.Annotations = *annotations
132+
span.Annotations = filterSchemas(*annotations)
123133
if span.ID.IsRoot() && rootSpanSet {
124134
return nil, errors.New("unexpected multiple root spans")
125135
}
@@ -205,7 +215,7 @@ func (in *InfluxDBStore) Traces() ([]*Trace, error) {
205215
if err != nil {
206216
return nil, err
207217
}
208-
span.Annotations = *annotations
218+
span.Annotations = filterSchemas(*annotations)
209219
trace, present := tracesCache[span.ID.Trace]
210220
if !present { // Root trace not added.
211221
return nil, errors.New("parent not found")
@@ -368,6 +378,131 @@ func extendFields(dst, src pointFields) pointFields {
368378
return dst
369379
}
370380

381+
// filterSchemas returns `Annotations` with items taken from `anns`
382+
// without those items that are not included within the value of
383+
// the annotation with key: "schemas".
384+
func filterSchemas(anns []Annotation) Annotations {
385+
var annotations Annotations
386+
387+
// Finds the annotation with key `schemasFieldName`.
388+
schemasAnn := findSchemasAnnotation(anns)
389+
390+
// Convert it to a string slice which contains the schemas.
391+
schemas := strings.Split(string(schemasAnn.Value), schemasFieldSeparator)
392+
393+
// Iterate over `anns` to check if each annotation is a schema related one
394+
// if so it's added to the `annotations` be returned, but only if it's present
395+
// on `schemas`.
396+
for _, a := range anns {
397+
if strings.HasPrefix(a.Key, schemaPrefix) {
398+
schema := a.Key[len(schemaPrefix):]
399+
// If schema does not exists; annotation `a` is not added to
400+
// the `annotations` be returned because it was not saved
401+
// by `Collect(...)`.
402+
// But exists because InfluxDB returns all fields(annotations)
403+
// even those ones not explicit written by `Collect(...)`.
404+
// Eg. if point "a" is written with a field "foo" &
405+
// point "b" with a field "bar" (both "a" & "b" written in the
406+
// same measurement), when querying for those points the result
407+
// will contain two fields "foo" & "bar", even though "bar" was
408+
// not present when writing Point "a".
409+
if schemaExists(schema, schemas) {
410+
// Schema exists, meaning `Collect(...)` method
411+
// saved this annotation.
412+
annotations = append(annotations, a)
413+
}
414+
} else {
415+
// Not a schema related annotation so just add it.
416+
annotations = append(annotations, a)
417+
}
418+
}
419+
return annotations
420+
}
421+
422+
// schemaExists checks if `schema` is present on `schemas`.
423+
func schemaExists(schema string, schemas []string) bool {
424+
for _, s := range schemas {
425+
if schema == s {
426+
return true
427+
}
428+
}
429+
return false
430+
}
431+
432+
// findSchemasAnnotation finds & returns an annotation
433+
// with key: `schemasFieldName`.
434+
func findSchemasAnnotation(anns []Annotation) *Annotation {
435+
for _, a := range anns {
436+
if a.Key == schemasFieldName {
437+
return &a
438+
}
439+
}
440+
return nil
441+
}
442+
443+
// mergeSchemasField merges new and old which are a set of schemas(strings)
444+
// separated by `schemasFieldSeparator` - eg. "HTTPClient,HTTPServer"
445+
// Returns the result of merging new & old without duplications.
446+
func mergeSchemasField(new, old interface{}) (string, error) {
447+
// Since both new and old are same data structures
448+
// (a set of strings separated by `schemasFieldSeparator`)
449+
// same code logic is applied.
450+
fields := []interface{}{new, old}
451+
var strFields []string
452+
453+
// Iterate over fields in order to cast each to string type
454+
// and append it to `strFields` for later usage.
455+
for _, field := range fields {
456+
switch field.(type) {
457+
case string:
458+
strFields = append(strFields, field.(string))
459+
case nil:
460+
continue
461+
default:
462+
return "", fmt.Errorf("unexpected event field type: %v", reflect.TypeOf(field))
463+
}
464+
}
465+
466+
// Cache for schemas; used to keep track of non duplicated schemas
467+
// to be returned.
468+
schemas := make(map[string]string, 0)
469+
470+
// Iterate over `strFields` to transform each to a slice([]string)
471+
// which each element is an schema that are added to schemas cache.
472+
for _, strField := range strFields {
473+
if strField == "" {
474+
continue
475+
}
476+
sf := strings.Split(strField, schemasFieldSeparator)
477+
for _, s := range sf {
478+
if _, found := schemas[s]; !found {
479+
schemas[s] = s
480+
}
481+
}
482+
}
483+
484+
var result []string
485+
for k, _ := range schemas {
486+
result = append(result, k)
487+
}
488+
489+
// Return a string which contains all the schemas separated by `schemasFieldSeparator`.
490+
return strings.Join(result, schemasFieldSeparator), nil
491+
}
492+
493+
// schemasFromAnnotations finds schemas in `anns` and builds a data structure
494+
// which is a set of all schemas found, those are separated by `schemasFieldSeparator`
495+
// and returned as string.
496+
func schemasFromAnnotations(anns []Annotation) string {
497+
var schemas []string
498+
for _, ann := range anns {
499+
if strings.HasPrefix(ann.Key, schemaPrefix) { // Check if is an annotation for a schema.
500+
schemas = append(schemas, ann.Key[len(schemaPrefix):])
501+
}
502+
}
503+
return strings.Join(schemas, schemasFieldSeparator)
504+
}
505+
371506
// withoutEmptyFields returns a pointFields without
372507
// those fields that has empty values.
373508
func withoutEmptyFields(pf pointFields) pointFields {

influxdb_store_test.go

+39
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package appdash
2+
3+
import "testing"
4+
5+
func TestMergeSchemasField(t *testing.T) {
6+
cases := []struct {
7+
NewField string
8+
OldField string
9+
Want string
10+
}{
11+
{NewField: "", OldField: "", Want: ""},
12+
{NewField: "HTTPClient", OldField: "", Want: "HTTPClient"},
13+
{NewField: "", OldField: "name", Want: "name"},
14+
{NewField: "HTTPClient", OldField: "name", Want: "HTTPClient,name"},
15+
{NewField: "HTTPServer", OldField: "HTTPClient,name", Want: "HTTPServer,HTTPClient,name"},
16+
}
17+
for i, c := range cases {
18+
got, err := mergeSchemasField(c.NewField, c.OldField)
19+
if err != nil {
20+
t.Fatalf("unexpected error: %v", err)
21+
}
22+
if got != c.Want {
23+
t.Fatalf("case #%d - got: %v, want: %v", i, got, c.Want)
24+
}
25+
}
26+
}
27+
28+
func TestSchemasFromAnnotations(t *testing.T) {
29+
anns := []Annotation{
30+
Annotation{Key: schemaPrefix + "HTTPClient"},
31+
Annotation{Key: schemaPrefix + "HTTPServer"},
32+
Annotation{Key: schemaPrefix + "name"},
33+
}
34+
got := schemasFromAnnotations(anns)
35+
want := "HTTPClient,HTTPServer,name"
36+
if got != want {
37+
t.Fatalf("got: %v, want: %v", got, want)
38+
}
39+
}

0 commit comments

Comments
 (0)