Skip to content
This repository has been archived by the owner on Oct 29, 2021. It is now read-only.

Commit

Permalink
improves comments readability & adds a low priority TODO
Browse files Browse the repository at this point in the history
  • Loading branch information
chris-ramon committed Feb 25, 2016
1 parent 4a4b359 commit 62b5c69
Showing 1 changed file with 57 additions and 77 deletions.
134 changes: 57 additions & 77 deletions influxdb_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,35 +55,32 @@ type InfluxDBStore struct {
}

func (in *InfluxDBStore) Collect(id SpanID, anns ...Annotation) error {
// Find a span's point, if found it will be rewritten with new annotations(`anns`)
// if not found, a new span's point will be created.
// Find a span's point, if found it will be rewritten with new given annotations(`anns`)
// if not found, a new span's point will be write to `in.dbName`.
p, err := in.findSpanPoint(id)
if err != nil {
return err
}

// trace_id, span_id & parent_id are set as tags
// because InfluxDB tags are indexed & those values
// are used later on queries.
// trace_id, span_id & parent_id are mostly used as part of the "where" part on queries so
// to have performant queries these are set as tags(InfluxDB indexes tags).
tags := map[string]string{
"trace_id": id.Trace.String(),
"span_id": id.Span.String(),
"parent_id": id.Parent.String(),
}

// Saving annotations as InfluxDB measurement spans fields
// which are not indexed.
// Annotations `anns` are set as fields(InfluxDB does not index fields).
fields := make(map[string]interface{}, len(anns))
for _, ann := range anns {
fields[ann.Key] = string(ann.Value)
}

if p != nil { // span exists on DB.
if p != nil { // span exists on `in.dbName`.
p.Measurement = spanMeasurementName
p.Tags = tags

// Using extendFields & withoutEmptyFields in order to have
// pointFields that only contains:
// Using extendFields & withoutEmptyFields in order to have pointFields that only contains:
// - Fields that are not saved on DB.
// - Fields that are saved but have empty values.
fields := extendFields(fields, withoutEmptyFields(p.Fields))
Expand All @@ -98,18 +95,18 @@ func (in *InfluxDBStore) Collect(id SpanID, anns ...Annotation) error {
p.Fields = fields
} else { // new span to be saved on DB.

// A field `schemasFieldName` contains all the schemas found on `anns`.
// `schemasFieldName` field contains all the schemas found on `anns`.
// Eg. fields[schemasFieldName] = "HTTPClient,HTTPServer"
fields[schemasFieldName] = schemasFromAnnotations(anns)
p = &influxDBClient.Point{
Measurement: spanMeasurementName,
Tags: tags, // indexed metadata.
Fields: fields, // non-indexed metadata.
Tags: tags,
Fields: fields,
Time: time.Now().UTC(),
}
}

// InfluxDB point represents a single span.
// A single point represents one span.
pts := []influxDBClient.Point{*p}
bps := influxDBClient.BatchPoints{
Points: pts,
Expand Down Expand Up @@ -138,7 +135,7 @@ func (in *InfluxDBStore) Trace(id ID) (*Trace, error) {
return nil, errors.New("trace not found")
}

// Iterate over series(spans) to create trace children's & set trace fields.
// Iterate over series(spans) to set `trace` fields.
var rootSpanSet bool
for _, s := range result.Series {
var isRootSpan bool
Expand Down Expand Up @@ -186,7 +183,7 @@ func (in *InfluxDBStore) Traces() ([]*Trace, error) {
// Cache to keep track of traces to be returned.
tracesCache := make(map[ID]*Trace, 0)

// Iterate over series(spans) to create traces.
// Iterate over series(spans) to create root traces.
for _, s := range rootSpansResult.Series {
span, err := newSpanFromRow(&s)
if err != nil {
Expand Down Expand Up @@ -218,15 +215,14 @@ func (in *InfluxDBStore) Traces() ([]*Trace, error) {
i += 1
}

// Queries for all children spans of the traces to be returned.
// Queries for all children spans of the root traces.
childrenSpansQuery := fmt.Sprintf("SELECT * FROM spans %s GROUP BY *", where)
childrenSpansResult, err := in.executeOneQuery(childrenSpansQuery)
if err != nil {
return nil, err
}

// Iterate over series(children spans) to create sub-traces
// and associates sub-traces with it's parent trace.
// Iterate over series(children spans) to set sub-traces to it's corresponding root trace.
for _, s := range childrenSpansResult.Series {
span, err := newSpanFromRow(&s)
if err != nil {
Expand Down Expand Up @@ -268,7 +264,7 @@ func (in *InfluxDBStore) createDBIfNotExists() error {
}
}

// If no errors query execution was successfully - either DB was created or already exists.
// If there are no errors, query execution was successfully - either DB was created or already exists.
response, err := in.con.Query(influxDBClient.Query{Command: q})
if err != nil {
return err
Expand All @@ -279,8 +275,7 @@ func (in *InfluxDBStore) createDBIfNotExists() error {
return nil
}

// createAdminUserIfNotExists creates an admin user
// using `in.adminUser` credentials if does not exist.
// createAdminUserIfNotExists finds admin user(`in.adminUser`) if not found it's created.
func (in *InfluxDBStore) createAdminUserIfNotExists() error {
userInfo, err := in.server.MetaClient.Authenticate(in.adminUser.Username, in.adminUser.Password)
if err == influxDBErrors.ErrUserNotFound {
Expand All @@ -291,7 +286,7 @@ func (in *InfluxDBStore) createAdminUserIfNotExists() error {
} else {
return err
}
if !userInfo.Admin {
if !userInfo.Admin { // must be admin user.
return errors.New("failed to validate InfluxDB user type, found non-admin user")
}
return nil
Expand Down Expand Up @@ -366,6 +361,9 @@ func (in *InfluxDBStore) init(server *influxDBServer.Server) error {
if err != nil {
return err
}

// TODO: Upgrade to client v2, see: github.com/influxdata/influxdb/blob/master/client/v2/client.go
// We're currently using v1.
con, err := influxDBClient.NewClient(influxDBClient.Config{
URL: *url,
Username: in.adminUser.Username,
Expand All @@ -391,7 +389,8 @@ func (in *InfluxDBStore) init(server *influxDBServer.Server) error {
if err := in.createDBIfNotExists(); err != nil {
return err
}
// TODO: support specifying the number of traces per page.

// TODO: let lib users decide `in.tracesPerPage` through InfluxDBStoreConfig.
in.tracesPerPage = defaultTracesPerPage
return nil
}
Expand Down Expand Up @@ -423,18 +422,15 @@ func annotationsFromRow(r *influxDBModels.Row) (*Annotations, error) {
fields = r.Values[0]
}

// len(r.Values) cannot be greater than 1.
// Values[0] is the slice containing a span's
// annotation values.
// `len(r.Values)` cannot be greater than 1. `r.Values[0]` is an slice containing annotation values.
if len(r.Values) > 1 {
return nil, errors.New("unexpected multiple row values")
}
annotations := make(Annotations, 0)

// Iterates over fields which represent span's annotation values.
// Iterates over fields, each field represents an `Annotation.Value`.
for i, field := range fields {
// It is safe to do column[0] (eg. 'Server.Request.Method')
// matches fields[0] (eg. 'GET')
// It's safe to do that column[0] (eg. 'Server.Request.Method') matches fields[0] (eg. 'GET').
key := r.Columns[i]
var value []byte
switch field.(type) {
Expand Down Expand Up @@ -464,42 +460,31 @@ func extendFields(dst, src pointFields) pointFields {
return dst
}

// filterSchemas returns `Annotations` with items taken from `anns`.
// It finds the annotation with key: `schemaFieldName`, which is later use
// to discard schema related annotations not present on it's value.
// filterSchemas returns `Annotations` which contains items taken from `anns`.
// Some items from `anns` won't be included(those which were not saved by `InfluxDBStore.Collect(...)`).
func filterSchemas(anns []Annotation) Annotations {
var annotations Annotations

// Finds the annotation with key `schemasFieldName`.
// Finds an annotation which: `Annotation.Key` is equal to `schemasFieldName`.
schemasAnn := findSchemasAnnotation(anns)

// Convert it to a string slice which contains the schemas.
// Converts `schemasAnn.Value` into slice of strings, each item is a schema.
// Eg. schemas := []string{"HTTPClient", "HTTPServer"}
schemas := strings.Split(string(schemasAnn.Value), schemasFieldSeparator)

// Iterate over `anns` to check if each annotation is a schema related one
// if so it's added to the `annotations` be returned, but only if it's present
// on `schemas`.
// If annotation is not schema related, it's added to annotations returned.
// Iterates over `anns` to check if each annotation should be included or not to the `annotations` be returned.
for _, a := range anns {
if strings.HasPrefix(a.Key, schemaPrefix) {
schema := a.Key[len(schemaPrefix):]

// If schema does not exists; annotation `a` is not added to
// the `annotations` be returned because it was not saved
// by `Collect(...)`.
// But exists because InfluxDB returns all fields(annotations)
// even those ones not explicit written by `Collect(...)`.
//
// Eg. if point "a" is written with a field "foo" &
// point "b" with a field "bar" (both "a" & "b" written in the
// same measurement), when querying for those points the result
// will contain two fields "foo" & "bar", even though field "bar"
// was not present when writing Point "a".
if schemaExists(schema, schemas) {
// Schema exists, meaning `Collect(...)` method
// saved this annotation.
if strings.HasPrefix(a.Key, schemaPrefix) { // Check if current annotation is schema related one.
schema := a.Key[len(schemaPrefix):] // Excludes the schema prefix part.

// Checks if `schema` exists in `schemas`, if so means current annotation was saved by `InfluxDBStore.Collect(...)`.
// If does not exist it means current annotation is empty on `InfluxDBStore.dbName` but still included within a query result.
// Eg. If point "f" with a field "foo" & point "b" with a field "bar" are written to the same InfluxDB measurement
// and later queried, the result will include two fields: "foo" & "bar" for both points, even though each was written with one field.
if schemaExists(schema, schemas) { // Saved by `InfluxDBStore.Collect(...)` so should be added.
annotations = append(annotations, a)
} else { // Do not add current annotation, is empty & not saved by `InfluxDBStore.Collect(...)`.
continue
}
} else {
// Not a schema related annotation so just add it.
Expand All @@ -519,8 +504,7 @@ func schemaExists(schema string, schemas []string) bool {
return false
}

// findSchemasAnnotation finds & returns an annotation
// with key: `schemasFieldName`.
// findSchemasAnnotation finds & returns an annotation which: `Annotation.Key` is equal to `schemasFieldName`.
func findSchemasAnnotation(anns []Annotation) *Annotation {
for _, a := range anns {
if a.Key == schemasFieldName {
Expand All @@ -530,35 +514,31 @@ func findSchemasAnnotation(anns []Annotation) *Annotation {
return nil
}

// mergeSchemasField merges new and old which are a set of schemas(strings)
// separated by `schemasFieldSeparator` - eg. "HTTPClient,HTTPServer"
// mergeSchemasField merges new and old which are a set of schemas(strings) separated by `schemasFieldSeparator`.
// Returns the result of merging new & old without duplications.
func mergeSchemasField(new, old interface{}) (string, error) {
// Since both new and old are same data structures
// (a set of strings separated by `schemasFieldSeparator`)
// same code logic is applied.
// Since new and old have the same data structures(a set of strings separated by `schemasFieldSeparator`).
// So same logic is applied to both.
fields := []interface{}{new, old}
var strFields []string

// Iterate over fields in order to cast each to string type
// and append it to `strFields` for later usage.
// Iterates over fields to convert each into a string and appends it to `strFields` for later usage.
for _, field := range fields {
switch field.(type) {
case string:
strFields = append(strFields, field.(string))
case nil:
continue
default:
return "", fmt.Errorf("unexpected event field type: %v", reflect.TypeOf(field))
return "", fmt.Errorf("unexpected schema field type: %v", reflect.TypeOf(field))
}
}

// Cache for schemas; used to keep track of non duplicated schemas
// to be returned.
// Schemas cache, used to keep track schemas to be returned(without duplications).
schemas := make(map[string]string, 0)

// Iterate over `strFields` to transform each to a slice([]string)
// which each element is an schema that are added to schemas cache.
// Iterates over `strFields` to convert each into a slice([]string), then iterates over it in order to
// add each to `schemas` if not present already.
for _, strField := range strFields {
if strField == "" {
continue
Expand All @@ -576,25 +556,25 @@ func mergeSchemasField(new, old interface{}) (string, error) {
result = append(result, k)
}

// Return a string which contains all the schemas separated by `schemasFieldSeparator`.
// Returns a string which contains all the schemas separated by `schemasFieldSeparator`.
return strings.Join(result, schemasFieldSeparator), nil
}

// schemasFromAnnotations finds schemas in `anns` and builds a data structure
// which is a set of all schemas found, those are separated by `schemasFieldSeparator`
// and returned as string.
// schemasFromAnnotations returns a string(a set of schemas(strings) separated by `schemasFieldSeparator`) - eg. "HTTPClient,HTTPServer,name".
// Each schema is extracted from each `Annotation.Key` from `anns`.
func schemasFromAnnotations(anns []Annotation) string {
var schemas []string
for _, ann := range anns {
if strings.HasPrefix(ann.Key, schemaPrefix) { // Check if is an annotation for a schema.

// Checks if current annotation is schema related.
if strings.HasPrefix(ann.Key, schemaPrefix) {
schemas = append(schemas, ann.Key[len(schemaPrefix):])
}
}
return strings.Join(schemas, schemasFieldSeparator)
}

// withoutEmptyFields returns a pointFields without
// those fields that has empty values.
// withoutEmptyFields filters `pf` and returns `pointFields` excluding those that have empty values.
func withoutEmptyFields(pf pointFields) pointFields {
r := make(pointFields, 0)
for k, v := range pf {
Expand Down

0 comments on commit 62b5c69

Please sign in to comment.