diff --git a/influxdb_store.go b/influxdb_store.go index 6d0a2fcf..bcdd7f3c 100644 --- a/influxdb_store.go +++ b/influxdb_store.go @@ -55,35 +55,32 @@ type InfluxDBStore struct { } func (in *InfluxDBStore) Collect(id SpanID, anns ...Annotation) error { - // Find a span's point, if found it will be rewritten with new annotations(`anns`) - // if not found, a new span's point will be created. + // Find a span's point, if found it will be rewritten with new given annotations(`anns`) + // if not found, a new span's point will be write to `in.dbName`. p, err := in.findSpanPoint(id) if err != nil { return err } - // trace_id, span_id & parent_id are set as tags - // because InfluxDB tags are indexed & those values - // are used later on queries. + // trace_id, span_id & parent_id are mostly used as part of the "where" part on queries so + // to have performant queries these are set as tags(InfluxDB indexes tags). tags := map[string]string{ "trace_id": id.Trace.String(), "span_id": id.Span.String(), "parent_id": id.Parent.String(), } - // Saving annotations as InfluxDB measurement spans fields - // which are not indexed. + // Annotations `anns` are set as fields(InfluxDB does not index fields). fields := make(map[string]interface{}, len(anns)) for _, ann := range anns { fields[ann.Key] = string(ann.Value) } - if p != nil { // span exists on DB. + if p != nil { // span exists on `in.dbName`. p.Measurement = spanMeasurementName p.Tags = tags - // Using extendFields & withoutEmptyFields in order to have - // pointFields that only contains: + // Using extendFields & withoutEmptyFields in order to have pointFields that only contains: // - Fields that are not saved on DB. // - Fields that are saved but have empty values. fields := extendFields(fields, withoutEmptyFields(p.Fields)) @@ -98,18 +95,18 @@ func (in *InfluxDBStore) Collect(id SpanID, anns ...Annotation) error { p.Fields = fields } else { // new span to be saved on DB. - // A field `schemasFieldName` contains all the schemas found on `anns`. + // `schemasFieldName` field contains all the schemas found on `anns`. // Eg. fields[schemasFieldName] = "HTTPClient,HTTPServer" fields[schemasFieldName] = schemasFromAnnotations(anns) p = &influxDBClient.Point{ Measurement: spanMeasurementName, - Tags: tags, // indexed metadata. - Fields: fields, // non-indexed metadata. + Tags: tags, + Fields: fields, Time: time.Now().UTC(), } } - // InfluxDB point represents a single span. + // A single point represents one span. pts := []influxDBClient.Point{*p} bps := influxDBClient.BatchPoints{ Points: pts, @@ -138,7 +135,7 @@ func (in *InfluxDBStore) Trace(id ID) (*Trace, error) { return nil, errors.New("trace not found") } - // Iterate over series(spans) to create trace children's & set trace fields. + // Iterate over series(spans) to set `trace` fields. var rootSpanSet bool for _, s := range result.Series { var isRootSpan bool @@ -186,7 +183,7 @@ func (in *InfluxDBStore) Traces() ([]*Trace, error) { // Cache to keep track of traces to be returned. tracesCache := make(map[ID]*Trace, 0) - // Iterate over series(spans) to create traces. + // Iterate over series(spans) to create root traces. for _, s := range rootSpansResult.Series { span, err := newSpanFromRow(&s) if err != nil { @@ -218,15 +215,14 @@ func (in *InfluxDBStore) Traces() ([]*Trace, error) { i += 1 } - // Queries for all children spans of the traces to be returned. + // Queries for all children spans of the root traces. childrenSpansQuery := fmt.Sprintf("SELECT * FROM spans %s GROUP BY *", where) childrenSpansResult, err := in.executeOneQuery(childrenSpansQuery) if err != nil { return nil, err } - // Iterate over series(children spans) to create sub-traces - // and associates sub-traces with it's parent trace. + // Iterate over series(children spans) to set sub-traces to it's corresponding root trace. for _, s := range childrenSpansResult.Series { span, err := newSpanFromRow(&s) if err != nil { @@ -268,7 +264,7 @@ func (in *InfluxDBStore) createDBIfNotExists() error { } } - // If no errors query execution was successfully - either DB was created or already exists. + // If there are no errors, query execution was successfully - either DB was created or already exists. response, err := in.con.Query(influxDBClient.Query{Command: q}) if err != nil { return err @@ -279,8 +275,7 @@ func (in *InfluxDBStore) createDBIfNotExists() error { return nil } -// createAdminUserIfNotExists creates an admin user -// using `in.adminUser` credentials if does not exist. +// createAdminUserIfNotExists finds admin user(`in.adminUser`) if not found it's created. func (in *InfluxDBStore) createAdminUserIfNotExists() error { userInfo, err := in.server.MetaClient.Authenticate(in.adminUser.Username, in.adminUser.Password) if err == influxDBErrors.ErrUserNotFound { @@ -291,7 +286,7 @@ func (in *InfluxDBStore) createAdminUserIfNotExists() error { } else { return err } - if !userInfo.Admin { + if !userInfo.Admin { // must be admin user. return errors.New("failed to validate InfluxDB user type, found non-admin user") } return nil @@ -366,6 +361,9 @@ func (in *InfluxDBStore) init(server *influxDBServer.Server) error { if err != nil { return err } + + // TODO: Upgrade to client v2, see: github.com/influxdata/influxdb/blob/master/client/v2/client.go + // We're currently using v1. con, err := influxDBClient.NewClient(influxDBClient.Config{ URL: *url, Username: in.adminUser.Username, @@ -391,7 +389,8 @@ func (in *InfluxDBStore) init(server *influxDBServer.Server) error { if err := in.createDBIfNotExists(); err != nil { return err } - // TODO: support specifying the number of traces per page. + + // TODO: let lib users decide `in.tracesPerPage` through InfluxDBStoreConfig. in.tracesPerPage = defaultTracesPerPage return nil } @@ -423,18 +422,15 @@ func annotationsFromRow(r *influxDBModels.Row) (*Annotations, error) { fields = r.Values[0] } - // len(r.Values) cannot be greater than 1. - // Values[0] is the slice containing a span's - // annotation values. + // `len(r.Values)` cannot be greater than 1. `r.Values[0]` is an slice containing annotation values. if len(r.Values) > 1 { return nil, errors.New("unexpected multiple row values") } annotations := make(Annotations, 0) - // Iterates over fields which represent span's annotation values. + // Iterates over fields, each field represents an `Annotation.Value`. for i, field := range fields { - // It is safe to do column[0] (eg. 'Server.Request.Method') - // matches fields[0] (eg. 'GET') + // It's safe to do that column[0] (eg. 'Server.Request.Method') matches fields[0] (eg. 'GET'). key := r.Columns[i] var value []byte switch field.(type) { @@ -464,42 +460,31 @@ func extendFields(dst, src pointFields) pointFields { return dst } -// filterSchemas returns `Annotations` with items taken from `anns`. -// It finds the annotation with key: `schemaFieldName`, which is later use -// to discard schema related annotations not present on it's value. +// filterSchemas returns `Annotations` which contains items taken from `anns`. +// Some items from `anns` won't be included(those which were not saved by `InfluxDBStore.Collect(...)`). func filterSchemas(anns []Annotation) Annotations { var annotations Annotations - // Finds the annotation with key `schemasFieldName`. + // Finds an annotation which: `Annotation.Key` is equal to `schemasFieldName`. schemasAnn := findSchemasAnnotation(anns) - // Convert it to a string slice which contains the schemas. + // Converts `schemasAnn.Value` into slice of strings, each item is a schema. // Eg. schemas := []string{"HTTPClient", "HTTPServer"} schemas := strings.Split(string(schemasAnn.Value), schemasFieldSeparator) - // Iterate over `anns` to check if each annotation is a schema related one - // if so it's added to the `annotations` be returned, but only if it's present - // on `schemas`. - // If annotation is not schema related, it's added to annotations returned. + // Iterates over `anns` to check if each annotation should be included or not to the `annotations` be returned. for _, a := range anns { - if strings.HasPrefix(a.Key, schemaPrefix) { - schema := a.Key[len(schemaPrefix):] - - // If schema does not exists; annotation `a` is not added to - // the `annotations` be returned because it was not saved - // by `Collect(...)`. - // But exists because InfluxDB returns all fields(annotations) - // even those ones not explicit written by `Collect(...)`. - // - // Eg. if point "a" is written with a field "foo" & - // point "b" with a field "bar" (both "a" & "b" written in the - // same measurement), when querying for those points the result - // will contain two fields "foo" & "bar", even though field "bar" - // was not present when writing Point "a". - if schemaExists(schema, schemas) { - // Schema exists, meaning `Collect(...)` method - // saved this annotation. + if strings.HasPrefix(a.Key, schemaPrefix) { // Check if current annotation is schema related one. + schema := a.Key[len(schemaPrefix):] // Excludes the schema prefix part. + + // Checks if `schema` exists in `schemas`, if so means current annotation was saved by `InfluxDBStore.Collect(...)`. + // If does not exist it means current annotation is empty on `InfluxDBStore.dbName` but still included within a query result. + // Eg. If point "f" with a field "foo" & point "b" with a field "bar" are written to the same InfluxDB measurement + // and later queried, the result will include two fields: "foo" & "bar" for both points, even though each was written with one field. + if schemaExists(schema, schemas) { // Saved by `InfluxDBStore.Collect(...)` so should be added. annotations = append(annotations, a) + } else { // Do not add current annotation, is empty & not saved by `InfluxDBStore.Collect(...)`. + continue } } else { // Not a schema related annotation so just add it. @@ -519,8 +504,7 @@ func schemaExists(schema string, schemas []string) bool { return false } -// findSchemasAnnotation finds & returns an annotation -// with key: `schemasFieldName`. +// findSchemasAnnotation finds & returns an annotation which: `Annotation.Key` is equal to `schemasFieldName`. func findSchemasAnnotation(anns []Annotation) *Annotation { for _, a := range anns { if a.Key == schemasFieldName { @@ -530,18 +514,15 @@ func findSchemasAnnotation(anns []Annotation) *Annotation { return nil } -// mergeSchemasField merges new and old which are a set of schemas(strings) -// separated by `schemasFieldSeparator` - eg. "HTTPClient,HTTPServer" +// mergeSchemasField merges new and old which are a set of schemas(strings) separated by `schemasFieldSeparator`. // Returns the result of merging new & old without duplications. func mergeSchemasField(new, old interface{}) (string, error) { - // Since both new and old are same data structures - // (a set of strings separated by `schemasFieldSeparator`) - // same code logic is applied. + // Since new and old have the same data structures(a set of strings separated by `schemasFieldSeparator`). + // So same logic is applied to both. fields := []interface{}{new, old} var strFields []string - // Iterate over fields in order to cast each to string type - // and append it to `strFields` for later usage. + // Iterates over fields to convert each into a string and appends it to `strFields` for later usage. for _, field := range fields { switch field.(type) { case string: @@ -549,16 +530,15 @@ func mergeSchemasField(new, old interface{}) (string, error) { case nil: continue default: - return "", fmt.Errorf("unexpected event field type: %v", reflect.TypeOf(field)) + return "", fmt.Errorf("unexpected schema field type: %v", reflect.TypeOf(field)) } } - // Cache for schemas; used to keep track of non duplicated schemas - // to be returned. + // Schemas cache, used to keep track schemas to be returned(without duplications). schemas := make(map[string]string, 0) - // Iterate over `strFields` to transform each to a slice([]string) - // which each element is an schema that are added to schemas cache. + // Iterates over `strFields` to convert each into a slice([]string), then iterates over it in order to + // add each to `schemas` if not present already. for _, strField := range strFields { if strField == "" { continue @@ -576,25 +556,25 @@ func mergeSchemasField(new, old interface{}) (string, error) { result = append(result, k) } - // Return a string which contains all the schemas separated by `schemasFieldSeparator`. + // Returns a string which contains all the schemas separated by `schemasFieldSeparator`. return strings.Join(result, schemasFieldSeparator), nil } -// schemasFromAnnotations finds schemas in `anns` and builds a data structure -// which is a set of all schemas found, those are separated by `schemasFieldSeparator` -// and returned as string. +// schemasFromAnnotations returns a string(a set of schemas(strings) separated by `schemasFieldSeparator`) - eg. "HTTPClient,HTTPServer,name". +// Each schema is extracted from each `Annotation.Key` from `anns`. func schemasFromAnnotations(anns []Annotation) string { var schemas []string for _, ann := range anns { - if strings.HasPrefix(ann.Key, schemaPrefix) { // Check if is an annotation for a schema. + + // Checks if current annotation is schema related. + if strings.HasPrefix(ann.Key, schemaPrefix) { schemas = append(schemas, ann.Key[len(schemaPrefix):]) } } return strings.Join(schemas, schemasFieldSeparator) } -// withoutEmptyFields returns a pointFields without -// those fields that has empty values. +// withoutEmptyFields filters `pf` and returns `pointFields` excluding those that have empty values. func withoutEmptyFields(pf pointFields) pointFields { r := make(pointFields, 0) for k, v := range pf {