Skip to content
This repository has been archived by the owner on Oct 29, 2021. It is now read-only.

Aggregation support for InfluxDBStore. #136

Merged
merged 16 commits into from
Apr 20, 2016
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions examples/cmd/webapp-influxdb/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func main() {
tapp := traceapp.New(nil)
tapp.Store = store
tapp.Queryer = store
tapp.Aggregator = store
log.Println("Appdash web UI running on HTTP :8700")
go func() {
log.Fatal(http.ListenAndServe(":8700", tapp))
Expand Down
109 changes: 109 additions & 0 deletions influxdb_store.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package appdash

import (
"encoding/json"
"errors"
"fmt"
"net/url"
Expand Down Expand Up @@ -65,12 +66,39 @@ func (in *InfluxDBStore) Collect(id SpanID, anns ...Annotation) error {
"parent_id": id.Parent.String(),
}

// Find the start and end time of the span.
var events []Event
if err := UnmarshalEvents(anns, &events); err != nil {
return err
}
var (
foundItems int
name string
duration time.Duration
)
for _, ev := range events {
switch v := ev.(type) {
case spanName:
foundItems++
name = v.Name
case TimespanEvent:
foundItems++
duration = v.End().Sub(v.Start())
}
}

// Annotations `anns` are set as fields(InfluxDB does not index fields).
fields := make(map[string]interface{}, len(anns))
for _, ann := range anns {
fields[ann.Key] = string(ann.Value)
}

// If we have span name and duration, set them as a tag and field.
if foundItems == 2 {
tags["name"] = name
fields["duration"] = float64(duration) / float64(time.Second)
}

// `schemasFieldName` field contains all the schemas found on `anns`.
// Eg. fields[schemasFieldName] = "HTTPClient,HTTPServer"
fields[schemasFieldName] = schemasFromAnnotations(anns)
Expand Down Expand Up @@ -145,6 +173,85 @@ func (in *InfluxDBStore) Trace(id ID) (*Trace, error) {
return trace, nil
}

func mustJSONFloat64(x interface{}) float64 {
n := x.(json.Number)
v, err := n.Float64()
if err != nil {
panic(err)
}
return v
}

func mustJSONInt64(x interface{}) int64 {
n := x.(json.Number)
v, err := n.Int64()
if err != nil {
panic(err)
}
return v
}

// Aggregate implements the Aggregator interface.
func (in *InfluxDBStore) Aggregate(start, end time.Duration) ([]*AggregatedResult, error) {
// Find the mean (average), minimum, maximum, std. deviation, and count of
// all spans.
q := `SELECT MEAN("duration"),MIN("duration"),MAX("duration"),STDDEV("duration"),COUNT("duration") from spans`
Copy link
Contributor

@chris-ramon chris-ramon Apr 18, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great to see InfluxDB aggregation features landing here!, I think this is great working starting point 🌟

Also I'd like to mention that continuous queries let us have interesting room for performance improvements here, meaning we could be down-sampling the N-slowest spans every certain time, that down-sampled spans would be saved on a new measurement, perhaps called slowest_spans; so we would end-up querying spans on the slowest_spans measurement, I think we can handle this in a new PR.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I appreciate the notes on continuous queries! I think I originally saw these but didn't understand exactly what they did, but your explanation makes it much more clear. Certainly a good idea to investigate in a follow-up PR.

q += fmt.Sprintf(
" WHERE time >= '%s' AND time <= '%s'",
time.Now().Add(start).UTC().Format(time.RFC3339Nano),
time.Now().Add(end).UTC().Format(time.RFC3339Nano),
)
q += ` GROUP BY "name"`
result, err := in.executeOneQuery(q)
if err != nil {
return nil, err
}

// Populate the results.
results := make([]*AggregatedResult, len(result.Series))
for i, row := range result.Series {
v := row.Values[0]
mean, min, max, stddev, count := v[1], v[2], v[3], v[4], v[5]
results[i] = &AggregatedResult{
RootSpanName: row.Tags["name"],
Average: time.Duration(mustJSONFloat64(mean) * float64(time.Second)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we replace float64(time.Second) with a variable instead? declared & assigned around line 209, to avoid repetitive equal calculations.

Min: time.Duration(mustJSONFloat64(min) * float64(time.Second)),
Max: time.Duration(mustJSONFloat64(max) * float64(time.Second)),
StdDev: time.Duration(mustJSONFloat64(stddev) * float64(time.Second)),
Samples: mustJSONInt64(count),
}
}
if len(result.Series) == 0 {
return nil, nil
}

n := 5
if n > len(result.Series) {
n = len(result.Series)
}

// Add in the N-slowest trace IDs for each span.
//
// TODO(slimsag): make N a pagination parameter instead.
result, err = in.executeOneQuery(fmt.Sprintf(`SELECT TOP("duration",%d),trace_id FROM spans GROUP BY "name"`, n))
if err != nil {
return nil, err
}
for i, row := range result.Series {
if row.Tags["name"] != results[i].RootSpanName {
panic("expectation violated") // never happens, just for sanity.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we just return an error instead of panic?(as well in line 247); in order to avoid lib-users taking care about this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could, but if this ever happens ideally we would have some "very angry user" reporting it. That is, this should never ever happen and, in the event that it does, it is 100% a bug in Appdash. I would hate for an error like that to bubble up somewhere and have someone think it is their responsibility to handle it.

}
for _, vals := range row.Values {
id, err := ParseID(vals[2].(string))
Copy link
Contributor

@chris-ramon chris-ramon Apr 18, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here should be relying on a fixed int to get the trace_id?, perhaps we should be checking for the column name "trace_id" and then obtaining it's value, perhaps something like: https://github.com/sourcegraph/appdash/blob/master/influxdb_store.go#L576

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very good idea! I will fix this.

if err != nil {
panic(err) // never happens, just for sanity.
}
results[i].Slowest = append(results[i].Slowest, id)
}
}
return results, nil
}

func (in *InfluxDBStore) Traces(opts TracesOpts) ([]*Trace, error) {
traces := make([]*Trace, 0)
rootSpansQuery := fmt.Sprintf("SELECT * FROM spans WHERE parent_id='%s'", zeroID)
Expand Down Expand Up @@ -598,6 +705,8 @@ func spansFromRow(row influxDBModels.Row) ([]*Span, error) {

// Checks if current column is some span's ID, if so set to the span & continue with next field.
switch column {
case "name", "duration":
continue // aggregation
case "trace_id":
traceID, err := fieldToSpanID(field, errFieldType)
if err != nil {
Expand Down
34 changes: 34 additions & 0 deletions store.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,40 @@ type Queryer interface {
Traces(opts TracesOpts) ([]*Trace, error)
}

// AggregatedResult represents a set of traces that were aggregated together by
// root span name to produce some useful metrics (average trace time, minimum
// time, a link to the slowest traces, etc).
type AggregatedResult struct {
// RootSpanName is the name of the root span of the traces that were
// aggregated to form this result.
RootSpanName string

// Average, Minimum, Maximum, and standard deviation of the total trace
// times (earliest span start time, latest span end time) of all traces
// that were aggregated to produce this result, respectively.
Average, Min, Max, StdDev time.Duration

// Samples is the number of traces that were sampled in order to produce
// this result.
Samples int64

// Slowest is the N-slowest trace IDs that were part of this group, such
// that these are the most valuable/slowest traces for inspection.
Slowest []ID
}

// Aggregator is a type of store that can aggregate its trace data and return
// results about it.
type Aggregator interface {
// Aggregate should return the aggregated data for all traces within the
// past 72/hr, such that:
//
// Aggregate(-72 * time.Hour, 0)
//
// would return all possible results.
Aggregate(start, end time.Duration) ([]*AggregatedResult, error)
}

// NewMemoryStore creates a new in-memory store
func NewMemoryStore() *MemoryStore {
return &MemoryStore{
Expand Down
5 changes: 3 additions & 2 deletions traceapp/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ import (
type App struct {
*Router

Store appdash.Store
Queryer appdash.Queryer
Store appdash.Store
Queryer appdash.Queryer
Aggregator appdash.Aggregator

tmplLock sync.Mutex
tmpls map[string]*htmpl.Template
Expand Down
Loading