diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 00000000..2712462b --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,10 @@ +# Changelog + +- Apr 15, 2016 - **Breaking Changes!** + - [#136](https://github.com/sourcegraph/appdash/pull/136) Users must now call `Recorder.Finish` when finished recording, or else data will not be collected. + - [#136](https://github.com/sourcegraph/appdash/pull/136) AggregateStore is removed in favor of InfluxDBStore, which is also embeddable, and is generally faster and more reliable. Refer to the [cmd/webapp-influxdb](https://github.com/sourcegraph/appdash/blob/master/examples/cmd/webapp-influxdb/main.go#L50) for further information on how to migrate to `InfluxDBStore`, or [read more about why this change was made](https://github.com/sourcegraph/appdash/issues/137). + - [#136](https://github.com/sourcegraph/appdash/issues/136) `AggregateEvent`, `Trace.IsAggregate` and `Trace.Aggregated` are removed. +- Mar 28, 2016 + - [#110](https://github.com/sourcegraph/appdash/pull/110) Added support for the [OpenTracing API](http://opentracing.io/). +- Mar 9 2016 + - [#99](https://github.com/sourcegraph/appdash/pull/99) Added an embeddable InfluxDB storage engine. diff --git a/aggregate.go b/aggregate.go deleted file mode 100644 index c049ab19..00000000 --- a/aggregate.go +++ /dev/null @@ -1,612 +0,0 @@ -package appdash - -import ( - "encoding/json" - "errors" - "fmt" - "log" - "sort" - "sync" - "time" -) - -func init() { - RegisterEvent(AggregateEvent{}) -} - -// AggregateEvent represents an aggregated set of timespan events. This is the -// only type of event produced by the AggregateStore type. -type AggregateEvent struct { - // The root span name of every item in this aggregated set of timespan events. - Name string - - // Trace IDs for the slowest of the above times (useful for inspection). - Slowest []ID -} - -// Schema implements the Event interface. -func (e AggregateEvent) Schema() string { return "aggregate" } - -// TODO(slimsag): do not encode aggregate events in JSON. We have to do this for -// now because the reflection code can't handle *Trace types sufficiently. - -// MarshalEvent implements the EventMarshaler interface. -func (e AggregateEvent) MarshalEvent() (Annotations, error) { - // Encode the entire event as JSON. - data, err := json.Marshal(e) - if err != nil { - return nil, err - } - return Annotations{ - {Key: "JSON", Value: data}, - }, nil -} - -// UnmarshalEvent implements the EventUnmarshaler interface. -func (e AggregateEvent) UnmarshalEvent(as Annotations) (Event, error) { - // Find the annotation with our key. - for _, ann := range as { - if ann.Key != "JSON" { - continue - } - err := json.Unmarshal(ann.Value, &e) - if err != nil { - return nil, fmt.Errorf("AggregateEvent.UnmarshalEvent: %v", err) - } - return e, nil - } - return nil, errors.New("expected one annotation with key=\"JSON\"") -} - -// spanGroupSlowest represents one of the slowest traces in a span group. -type spanGroupSlowest struct { - TraceID ID // Root span ID of the slowest trace. - Start, End time.Time // Start and end time of the slowest trace. -} - -// empty tells if this spanGroupSlowest slot is empty / uninitialized. -func (s spanGroupSlowest) empty() bool { - return s == spanGroupSlowest{} -} - -// spanGroup represents all of the times for the root spans (i.e. traces) of the -// given name. It also contains the N-slowest traces of the group. -type spanGroup struct { - // Trace is the trace ID that the generated AggregateEvent has been placed - // into for collection. - Trace SpanID - Name string // Root span name (e.g. the route for httptrace). - Times [][2]time.Time // Aggregated timespans for the traces. - TimeSpans []ID // SpanID.Span of each associated TimespanEvent for the Times slice - Slowest []spanGroupSlowest // N-slowest traces in the group. -} - -func (s spanGroup) Len() int { return len(s.Slowest) } -func (s spanGroup) Swap(i, j int) { s.Slowest[i], s.Slowest[j] = s.Slowest[j], s.Slowest[i] } -func (s spanGroup) Less(i, j int) bool { - a := s.Slowest[i] - b := s.Slowest[j] - - // A sorts before B if it took a greater amount of time than B (slowest - // to-fastest sorting). - return a.End.Sub(a.Start) > b.End.Sub(b.Start) -} - -// update updates the span group to account for a potentially slowest trace, -// returning whether or not the given trace was indeed slowest. The timespan ID -// is the SpanID.Span of the TimespanEvent for future removal upon eviction. -func (s *spanGroup) update(start, end time.Time, timespan ID, trace ID, remove func(trace ID)) bool { - s.Times = append(s.Times, [2]time.Time{start, end}) - s.TimeSpans = append(s.TimeSpans, timespan) - - // The s.Slowest list is kept sorted from slowest to fastest. As we want to - // steal the slot from the fastest (or zero) one we iterate over it - // backwards comparing times. - for i := len(s.Slowest) - 1; i > 0; i-- { - sm := s.Slowest[i] - if sm.TraceID == trace { - // Trace is already inside the group as one of the slowest. - return false - } - - // If our time is lesser than the trace in the slot already, we aren't - // slower so don't steal the slot. - if end.Sub(start) < sm.End.Sub(sm.Start) { - continue - } - - // If there is already a trace inside this group (i.e. we are taking its - // spot as one of the slowest), then we must request for its removal from - // the output store. - if sm.TraceID != 0 { - remove(sm.TraceID) - } - - s.Slowest[i] = spanGroupSlowest{ - TraceID: trace, - Start: start, - End: end, - } - sort.Sort(s) - return true - } - return false -} - -// evictBefore evicts all times in the group -func (s *spanGroup) evictBefore(tnano int64, debug bool, deleteSub func(s SpanID)) { - count := 0 -search: - for i, ts := range s.Times { - if ts[0].UnixNano() < tnano { - s.Times = append(s.Times[:i], s.Times[i+1:]...) - - // Remove the associated subspan holding the TimespanEvent in the - // output MemoryStore. - id := s.TimeSpans[i] - s.TimeSpans = append(s.TimeSpans[:i], s.TimeSpans[i+1:]...) - deleteSub(SpanID{Trace: s.Trace.Trace, Span: id, Parent: s.Trace.Span}) - - count++ - goto search - } - } - - if debug && count > 0 { - log.Printf("AggregateStore: evicted %d timespans from the group %q", count, s.Name) - } -} - -// The AggregateStore collection process can be described as follows: -// -// 1. Collection on AggregateStore occurs. -// 3. Collection is sent directly to pre-storage -// - i.e. LimitStore backed by its own MemoryStore. -// 4. Eviction runs if needed. -// - Every group has an eviction process ran; removes times older than 72/hrs. -// - Each N-slowest trace in the group older than 72hr is evicted from output. -// - Empty span groups (no trace over past 72/hr) are removed entirely. -// 5. Find a group for the collection -// - Only succeeds if a spanName has or is being been collected. -// - Otherwise collections end up in pre-storage until we get the spanName. -// 6. Collection is unmarshaled into a set of events, trace time is determined. -// 7. Group is updated to consider the collection as being one of the N-slowest. -// - Older N-slowest trace is removed. -// 8. N-slowest trace collections that are in pre-storage: -// - Removed from pre-storage. -// - Placed into output MemoryStore. -// 9. Data Storage -// - Aggregation data is stored as a phony trace (so same storage backends can be used). -// - The old AggregationEvent is removed from output MemoryStore. -// - The new AggregationEvent with updated N-slowest trace IDs is inserted. -// - A TimespanEvent (subspan) is recorded into the trace. -// - Not stored in AggregationEvent as a slice (because O(N) vs O(1) performance for updates). - -// AggregateStore aggregates timespan events into groups based on the root span -// name. Much like a RecentStore, it evicts aggregated events after a certain -// time period. -type AggregateStore struct { - // MinEvictAge is the minimum age of group data before it is evicted. - MinEvictAge time.Duration - - // MaxRate is the maximum expected rate of incoming traces. - // - // Multiple traces can be collected at once, and they must be queued up in - // memory until the entire trace has been collected, otherwise the N-slowest - // traces cannot be stored. - // - // If the number is too large, a lot of memory will be used (to store - // MaxRate number of traces), and if too small some then some aggregate - // events will not have the full N-slowest traces associated with them. - MaxRate int - - // NSlowest is the number of slowest traces to fully keep for inspection for - // each group. - NSlowest int - - // Debug is whether or not to log debug messages. - Debug bool - - // MemoryStore is the memory store were aggregated traces are saved to and - // deleted from. It is the final destination for traces. - *MemoryStore - - mu sync.Mutex - groups map[ID]*spanGroup // map of trace ID to span group. - insertTimes map[ID]time.Time // map of times that groups was inserted into at - groupsByName map[string]ID // looks up a groups trace ID by name. - pre *LimitStore // traces which do not have span groups yet - lastEvicted time.Time // last time that eviction ran -} - -// NewAggregateStore is short-hand for: -// -// store := &AggregateStore{ -// MinEvictAge: 72 * time.Hour, -// MaxRate: 4096, -// NSlowest: 5, -// MemoryStore: NewMemoryStore(), -// } -// -func NewAggregateStore() *AggregateStore { - return &AggregateStore{ - MinEvictAge: 72 * time.Hour, - MaxRate: 4096, - NSlowest: 5, - MemoryStore: NewMemoryStore(), - } -} - -// Collect calls the underlying store's Collect, deleting the oldest -// trace if the capacity has been reached. -func (as *AggregateStore) Collect(id SpanID, anns ...Annotation) error { - as.mu.Lock() - defer as.mu.Unlock() - - // Initialization - if as.groups == nil { - as.groups = make(map[ID]*spanGroup) - as.insertTimes = make(map[ID]time.Time) - as.groupsByName = make(map[string]ID) - as.pre = &LimitStore{ - Max: as.MaxRate, - DeleteStore: NewMemoryStore(), - } - go as.clearGroups() - } - - if as.Debug { - // Determine the total number of traces and times in each named span - // group. - nTraces := 0 - nTimes := 0 - for _, id := range as.groupsByName { - g, ok := as.groups[id] - if !ok { - continue - } - for _, sm := range g.Slowest { - if sm.TraceID != 0 { - nTraces++ - } - } - nTimes += len(g.Times) - } - - // Log some statistics: these can be used to identify serious issues - // relating to overstorage or memory leakage in the primary data maps. - msTraces, err := as.MemoryStore.Traces() - if err != nil { - log.Println(err) - } - exceeding := len(msTraces) - (len(as.groupsByName) * as.NSlowest) - if exceeding < 0 { - exceeding = 0 - } - nextEvict := as.MinEvictAge - time.Since(as.lastEvicted) - log.Printf("AggregateStore: [%d groups by ID] [%d groups by name] [%d-slowest traces] [%d trace times]\n", len(as.groups), len(as.groupsByName), nTraces, nTimes) - log.Printf("AggregateStore: [%d traces in MemoryStore; exceeding us by %d] [eviction in %s]\n", len(msTraces), exceeding, nextEvict) - - // Validate that the N-slowest traces we store are not exceeding what the user asked for. - if nTraces > 0 && (len(as.groupsByName)/nTraces) > as.NSlowest { - log.Println("AggregateStore: WARNING: Have too many N-slowest traces for each span group:") - for _, id := range as.groupsByName { - g := as.groups[id] - log.Printf("AggregateStore: %q has %d-slowest traces\n", g.Name, len(g.Slowest)) - } - } - } - - // Collect into the limit store. - if err := as.pre.Collect(id, anns...); err != nil { - return err - } - - // Consider eviction of old data. - if time.Since(as.lastEvicted) > as.MinEvictAge { - // Function for evictBefore to invoke when removing TimespanEvents that - // we've previously stored in the output MemoryStore. - deleteSub := func(id SpanID) { - as.MemoryStore.Lock() - if !as.MemoryStore.deleteSubNoLock(id, false) { - panic("failed to delete spanID") - } - as.MemoryStore.Unlock() - } - if err := as.evictBefore(time.Now().Add(-1*as.MinEvictAge), deleteSub); err != nil { - return err - } - } - - // Grab the group for our span. - group, ok := as.group(id, anns...) - if !ok { - // We don't have a group for the trace, and can't create one (the - // spanName event isn't present yet). - return nil - } - - // Unmarshal the events. - var events []Event - if err := UnmarshalEvents(anns, &events); err != nil { - return err - } - - // Find the start and end time of the trace. - eStart, eEnd, ok := findTraceTimes(events) - if !ok { - // We didn't find any timespan events at all, so we're done here. - return nil - } - - // Update the group to consider this trace being one of the slowest. - timespanID := NewSpanID(group.Trace) - group.update(eStart, eEnd, timespanID.Span, id.Trace, func(trace ID) { - // Delete the request trace from the output store. - if err := as.deleteOutput(trace); err != nil { - log.Printf("AggregateStore: failed to delete a trace: %s", err) - } - }) - - // Move traces from the limit store into the group, as needed. - for _, slowest := range group.Slowest { - // Find the trace in the limit store. - trace, err := as.pre.Trace(slowest.TraceID) - if err == ErrTraceNotFound { - continue - } - if err != nil { - return err - } - - // Place into output store. - var walk func(t *Trace) error - walk = func(t *Trace) error { - err := as.MemoryStore.Collect(t.Span.ID, t.Span.Annotations...) - if err != nil { - return err - } - for _, sub := range t.Sub { - if err := walk(sub); err != nil { - return err - } - } - return nil - } - if err := walk(trace); err != nil { - return err - } - - // Delete from the limit store. - err = as.pre.Delete(slowest.TraceID) - if err != nil { - return err - } - } - - // Prepare the aggregation event (before locking below). - ev := &AggregateEvent{ - Name: group.Name, - } - for _, slowest := range group.Slowest { - if !slowest.empty() { - ev.Slowest = append(ev.Slowest, slowest.TraceID) - } - } - if as.Debug && len(ev.Slowest) == 0 { - log.Printf("AggregateStore: no slowest traces for group %q (consider increasing MaxRate)", group.Name) - } - - // Prepare the timespan event (also before locking below). - tev := ×panEvent{ - S: eStart, - E: eEnd, - } - - // As we're updating the aggregation event, we go ahead and delete the old - // one now. We do this all under as.MemoryStore.Lock otherwise users (e.g. the - // web UI) can pull from as.MemoryStore when the trace has been deleted. - as.MemoryStore.Lock() - defer as.MemoryStore.Unlock() - as.MemoryStore.deleteSubNoLock(group.Trace, true) - - // Record an aggregate event with the given name. - recEvent := func(e Event, spanID SpanID) error { - anns, err := MarshalEvent(e) - if err != nil { - return err - } - return as.MemoryStore.collectNoLock(spanID, anns...) - } - if err := recEvent(spanName{Name: group.Name}, group.Trace); err != nil { - return err - } - if err := recEvent(ev, group.Trace); err != nil { - return err - } - - // Record the timespan event as a subspan of the aggregation event. - if err := recEvent(tev, timespanID); err != nil { - return err - } - return nil -} - -// deleteOutput deletes the given traces from the output memory store. -func (as *AggregateStore) deleteOutput(traces ...ID) error { - for _, trace := range traces { - if err := as.MemoryStore.Delete(trace); err != nil { - return err - } - } - return nil -} - -// group returns the span group that the collection belongs in, or nil, false if -// no such span group exists / could be created. -// -// The as.mu lock must be held for this method to operate safely. -func (as *AggregateStore) group(id SpanID, anns ...Annotation) (*spanGroup, bool) { - // Do nothing if we already have a group associated with our root span. - if group, ok := as.groups[id.Trace]; ok { - return group, true - } - - // At this point, we need a root span or else we can't create the group. - if !id.IsRoot() { - return nil, false - } - - // And likewise, always a name event. - var name spanName - if err := UnmarshalEvent(anns, &name); err != nil { - return nil, false - } - - // If there already exists a group with that name, then we just associate - // our trace with that group for future lookup and we're good to go. - if groupID, ok := as.groupsByName[name.Name]; ok { - group := as.groups[groupID] - as.groups[id.Trace] = group - as.insertTimes[id.Trace] = time.Now() - return group, true - } - - // Create a new group, and associate our trace with it. - group := &spanGroup{ - Name: name.Name, - Trace: NewRootSpanID(), - Slowest: make([]spanGroupSlowest, as.NSlowest), - } - as.groups[id.Trace] = group - as.insertTimes[id.Trace] = time.Now() - as.groupsByName[name.Name] = id.Trace - return group, true -} - -// clearGroups removes IDs from as.groups once they are old enough to no longer -// need to be alive (i.e. after we're certain no more collections will occur for -// that ID). It is used so the map does not leak memory. -// -// TODO(slimsag): find a more correct solution to this. Maybe we can get rid of -// as.groups all-together and have no need for clearing them here? -func (as *AggregateStore) clearGroups() { - deleteAfter := 30 * time.Second - for { - time.Sleep(deleteAfter) - - as.mu.Lock() - removal: - for id, _ := range as.groups { - if time.Since(as.insertTimes[id]) > deleteAfter { - for _, nameID := range as.groupsByName { - if id == nameID { - continue removal - } - } - delete(as.insertTimes, id) - delete(as.groups, id) - } - } - as.mu.Unlock() - } -} - -// evictBefore evicts aggregation events that were created before t. -// -// The as.mu lock must be held for this method to operate safely. -func (as *AggregateStore) evictBefore(t time.Time, deleteSub func(id SpanID)) error { - evictStart := time.Now() - as.lastEvicted = evictStart - tnano := t.UnixNano() - - // Build a list of aggregation events to evict. - var toEvict []ID - for _, group := range as.groups { - group.evictBefore(tnano, as.Debug, deleteSub) - - searchSlowest: - for i, sm := range group.Slowest { - if !sm.empty() && sm.Start.UnixNano() < tnano { - group.Slowest = append(group.Slowest[:i], group.Slowest[i+1:]...) - toEvict = append(toEvict, sm.TraceID) - goto searchSlowest - } - } - - // If the group is not completely empty, we have nothing more to do. - if len(group.Times) > 0 || len(group.Slowest) > 0 { - continue - } - - // Remove the empty group from the maps, noting that as.groups often - // has multiple references to the same group. - for id, g := range as.groups { - if g == group { - delete(as.groups, id) - } - } - delete(as.groupsByName, group.Name) - - // Also request removal of the group (AggregateEvent) from the output store. - err := as.deleteOutput(group.Trace.Trace) - if err != nil { - return err - } - } - - // We are done if there is nothing to evict. - if len(toEvict) == 0 { - return nil - } - - if as.Debug { - log.Printf("AggregateStore: deleting %d slowest traces created before %s (age check took %s)", len(toEvict), t, time.Since(evictStart)) - } - - // Spawn separate goroutine so we don't hold the as.mu lock. - go func() { - deleteStart := time.Now() - if err := as.deleteOutput(toEvict...); err != nil { - log.Printf("AggregateStore: failed to delete slowest traces: %s", err) - } - if as.Debug { - log.Printf("AggregateStore: finished deleting %d slowest traces created before %s (took %s)", len(toEvict), t, time.Since(deleteStart)) - } - }() - return nil -} - -// findTraceTimes finds the minimum and maximum timespan event times for the -// given set of events, or returns ok == false if there are no such events. -func findTraceTimes(events []Event) (start, end time.Time, ok bool) { - // Find the start and end time of the trace. - var ( - eStart, eEnd time.Time - haveTimes = false - ) - for _, e := range events { - e, ok := e.(TimespanEvent) - if !ok { - continue - } - if !haveTimes { - haveTimes = true - eStart = e.Start() - eEnd = e.End() - continue - } - if v := e.Start(); v.UnixNano() < eStart.UnixNano() { - eStart = v - } - if v := e.End(); v.UnixNano() > eEnd.UnixNano() { - eEnd = v - } - } - if !haveTimes { - // We didn't find any timespan events at all, so we're done here. - ok = false - return - } - return eStart, eEnd, true -} diff --git a/aggregate_test.go b/aggregate_test.go deleted file mode 100644 index f2818640..00000000 --- a/aggregate_test.go +++ /dev/null @@ -1,261 +0,0 @@ -package appdash - -import ( - "reflect" - "testing" - "time" -) - -// Verify the event type satisfies the interfaces. -var _ = EventMarshaler(AggregateEvent{}) -var _ = EventUnmarshaler(AggregateEvent{}) -var _ = Event(AggregateEvent{}) - -// TestAggregateStore tests basic AggregateStore functionality. -func TestAggregateStore(t *testing.T) { - // Create an aggregate store. - ms := NewMemoryStore() - as := &AggregateStore{ - MinEvictAge: 72 * time.Hour, - MaxRate: 4096, - NSlowest: 5, - MemoryStore: ms, - } - - // Record a few traces under the same name. - for i := 0; i < 10; i++ { - root := NewRootSpanID() - rec := NewRecorder(root, as) - rec.Name("the-trace-name") - e := timespanEvent{ - S: time.Now().Add(time.Duration(-i) * time.Minute), - E: time.Now(), - } - rec.Event(e) - rec.Finish() - if errs := rec.Errors(); len(errs) > 0 { - t.Fatal(errs) - } - } - - // Verify the recorded traces. - traces, err := ms.Traces() - if err != nil { - t.Fatal(err) - } - - // One trace is the aggregated one, the other 5 are the N-slowest full - // traces. - if len(traces) != 6 { - t.Fatalf("expected 6 traces got %d", len(traces)) - } - - // Verify we have the aggregated trace events. - var agg *AggregateEvent - for _, tr := range traces { - ev, _, err := tr.Aggregated() - if err != nil { - t.Fatal(err) - } - if ev != nil { - agg = ev - } - } - if agg == nil { - t.Fatalf("expected 1 aggregated trace event, found nil") - } - - // Verify we have the N-slowest other full traces. - var found []ID - for _, t := range traces { - for _, want := range agg.Slowest { - if t.Span.ID.Trace == want { - found = append(found, want) - } - } - } - if len(found) != as.NSlowest { - t.Fatalf("expected %d N-slowest full traces, found %d", as.NSlowest, len(found)) - } -} - -// TestAggregateStoreNSlowest tests that the AggregateStore.NSlowest field is -// operating correctly. -func TestAggregateStoreNSlowest(t *testing.T) { - // Create an aggregate store. - ms := NewMemoryStore() - as := &AggregateStore{ - MinEvictAge: 72 * time.Hour, - MaxRate: 4096, - NSlowest: 5, - MemoryStore: ms, - } - - now := time.Now() - - insert := func(times []time.Duration) []time.Duration { - // Record a few traces under the same name. - for i := 0; i < len(times); i++ { - root := NewRootSpanID() - rec := NewRecorder(root, as) - rec.Name("the-trace-name") - e := timespanEvent{ - S: now, - E: now.Add(times[i]), - } - rec.Event(e) - rec.Finish() - if errs := rec.Errors(); len(errs) > 0 { - t.Fatal(errs) - } - } - - // Query the traces from the memory store. - traces, err := ms.Traces() - if err != nil { - t.Fatal(err) - } - - // One trace is the aggregated one, the other 5 are the N-slowest full - // traces. - if len(traces) != as.NSlowest+1 { - t.Fatalf("expected %d traces got %d", as.NSlowest+1, len(traces)) - } - - // Verify we have the aggregated trace events. - var agg *AggregateEvent - for _, tr := range traces { - ev, _, err := tr.Aggregated() - if err != nil { - t.Fatal(err) - } - if ev != nil { - agg = ev - } - } - if agg == nil { - t.Fatalf("expected 1 aggregated trace event, found nil") - } - - // Determine time of each slowest trace. - var d []time.Duration - for _, slowest := range agg.Slowest { - st, err := ms.Trace(slowest) - if err != nil { - t.Fatal(err) - } - - // Unmarshal the events. - var events []Event - if err := UnmarshalEvents(st.Annotations, &events); err != nil { - t.Fatal(err) - } - - start, end, ok := findTraceTimes(events) - if !ok { - t.Fatal("no timespane events") - } - d = append(d, end.Sub(start)) - } - return d - } - - // Insert ten basic values to start with. - want := []time.Duration{ - 5 * time.Minute, - 5 * time.Minute, - 4 * time.Minute, - 4 * time.Minute, - 3 * time.Minute, - } - got := insert([]time.Duration{ - 2 * time.Minute, - 3 * time.Minute, - 5 * time.Minute, - 4 * time.Minute, - 1 * time.Minute, - 4 * time.Minute, - 2 * time.Minute, - 5 * time.Minute, - 3 * time.Minute, - 1 * time.Minute, - }) - if !reflect.DeepEqual(got, want) { - t.Logf("got %v\n", got) - t.Fatalf("want %v", want) - } - - // Now we insert a sixth value which should overtake the smallest duration. - want = []time.Duration{ - 6 * time.Minute, - 5 * time.Minute, - 5 * time.Minute, - 4 * time.Minute, - 4 * time.Minute, - } - got = insert([]time.Duration{6 * time.Minute}) - if !reflect.DeepEqual(got, want) { - t.Logf("got %v\n", got) - t.Fatalf("want %v", want) - } -} - -func TestAggregateStoreMinEvictAge(t *testing.T) { - // Create an aggregate store. - ms := NewMemoryStore() - as := &AggregateStore{ - MinEvictAge: 1 * time.Second, - MaxRate: 4096, - NSlowest: 5, - MemoryStore: ms, - } - - // Record a few traces. - for i := 0; i < 10; i++ { - root := NewRootSpanID() - rec := NewRecorder(root, as) - rec.Name("the-trace-name") - e := timespanEvent{ - S: time.Now().Add(time.Duration(-i) * time.Minute), - E: time.Now(), - } - rec.Event(e) - rec.Finish() - if errs := rec.Errors(); len(errs) > 0 { - t.Fatal(errs) - } - } - - // Verify the number of recorded traces. - traces, err := ms.Traces() - if err != nil { - t.Fatal(err) - } - if len(traces) != 6 { - t.Fatalf("expected 6 traces got %d", len(traces)) - } - - // Wait so that next collection will cause eviction. - time.Sleep(as.MinEvictAge) - - // Trigger the eviction by making any sort of collection. - rec := NewRecorder(NewRootSpanID(), as) - rec.Name("collect") - rec.Finish() - if errs := rec.Errors(); len(errs) > 0 { - t.Fatal(errs) - } - - // Wait for deletion to occur (it happens in a separate goroutine and we - // don't want to introduce synchronization just for this test). - time.Sleep(1 * time.Second) - - // Verify the eviction. - traces, err = ms.Traces() - if err != nil { - t.Fatal(err) - } - if len(traces) != 0 { - t.Fatalf("expected 0 traces got %d", len(traces)) - } -} diff --git a/examples/cmd/webapp-influxdb/main.go b/examples/cmd/webapp-influxdb/main.go index 083b3cc2..28027ca7 100644 --- a/examples/cmd/webapp-influxdb/main.go +++ b/examples/cmd/webapp-influxdb/main.go @@ -64,6 +64,7 @@ func main() { tapp := traceapp.New(nil) tapp.Store = store tapp.Queryer = store + tapp.Aggregator = store log.Println("Appdash web UI running on HTTP :8700") go func() { log.Fatal(http.ListenAndServe(":8700", tapp)) diff --git a/influxdb_store.go b/influxdb_store.go index 62e66fbc..2aa7d181 100644 --- a/influxdb_store.go +++ b/influxdb_store.go @@ -1,6 +1,7 @@ package appdash import ( + "encoding/json" "errors" "fmt" "net/url" @@ -65,15 +66,52 @@ func (in *InfluxDBStore) Collect(id SpanID, anns ...Annotation) error { "parent_id": id.Parent.String(), } + // Find the start and end time of the span. + var events []Event + if err := UnmarshalEvents(anns, &events); err != nil { + return err + } + var ( + foundItems int + name string + duration time.Duration + ) + for _, ev := range events { + switch v := ev.(type) { + case spanName: + foundItems++ + name = v.Name + case TimespanEvent: + foundItems++ + duration = v.End().Sub(v.Start()) + } + } + // Annotations `anns` are set as fields(InfluxDB does not index fields). fields := make(map[string]interface{}, len(anns)) for _, ann := range anns { fields[ann.Key] = string(ann.Value) } + // If we have span name and duration, set them as a tag and field. + if foundItems == 2 { + tags["name"] = name + fields["duration"] = float64(duration) / float64(time.Second) + } + // `schemasFieldName` field contains all the schemas found on `anns`. // Eg. fields[schemasFieldName] = "HTTPClient,HTTPServer" fields[schemasFieldName] = schemasFromAnnotations(anns) + + // Tag values cannot contain newlines or they mess up the WRITE and cause + // errors. + // + // TODO: investigate why this is; possibly related to https://github.com/influxdata/influxdb/issues/3545 + // which was only for fields (not tags). + for k, v := range tags { + tags[k] = strings.Replace(v, "\n", " ", -1) + } + p := &influxDBClient.Point{ Measurement: spanMeasurementName, Tags: tags, @@ -145,9 +183,123 @@ func (in *InfluxDBStore) Trace(id ID) (*Trace, error) { return trace, nil } -func (in *InfluxDBStore) Traces() ([]*Trace, error) { +func mustJSONFloat64(x interface{}) float64 { + n := x.(json.Number) + v, err := n.Float64() + if err != nil { + panic(err) + } + return v +} + +func mustJSONInt64(x interface{}) int64 { + n := x.(json.Number) + v, err := n.Int64() + if err != nil { + panic(err) + } + return v +} + +// Aggregate implements the Aggregator interface. +func (in *InfluxDBStore) Aggregate(start, end time.Duration) ([]*AggregatedResult, error) { + // Find the mean (average), minimum, maximum, std. deviation, and count of + // all spans. + q := `SELECT MEAN("duration"),MIN("duration"),MAX("duration"),STDDEV("duration"),COUNT("duration") from spans` + q += fmt.Sprintf( + " WHERE time >= '%s' AND time <= '%s'", + time.Now().Add(start).UTC().Format(time.RFC3339Nano), + time.Now().Add(end).UTC().Format(time.RFC3339Nano), + ) + q += ` GROUP BY "name"` + result, err := in.executeOneQuery(q) + if err != nil { + return nil, err + } + + // Populate the results. + results := make([]*AggregatedResult, len(result.Series)) + for i, row := range result.Series { + v := row.Values[0] + mean, min, max, stddev, count := v[1], v[2], v[3], v[4], v[5] + if stddev == nil { + // stddev will be nil when there were not enough items to be able + // to calculate a standard deviation. + stddev = json.Number("0") + } + + results[i] = &AggregatedResult{ + RootSpanName: row.Tags["name"], + Average: time.Duration(mustJSONFloat64(mean) * float64(time.Second)), + Min: time.Duration(mustJSONFloat64(min) * float64(time.Second)), + Max: time.Duration(mustJSONFloat64(max) * float64(time.Second)), + StdDev: time.Duration(mustJSONFloat64(stddev) * float64(time.Second)), + Samples: mustJSONInt64(count), + } + } + if len(result.Series) == 0 { + return nil, nil + } + + n := 5 + if n > len(result.Series) { + n = len(result.Series) + } + + // Add in the N-slowest trace IDs for each span. + // + // TODO(slimsag): make N a pagination parameter instead. + result, err = in.executeOneQuery(fmt.Sprintf(`SELECT TOP("duration",%d),trace_id FROM spans GROUP BY "name"`, n)) + if err != nil { + return nil, err + } + for rowIndex, row := range result.Series { + if row.Tags["name"] != results[rowIndex].RootSpanName { + panic("expectation violated") // never happens, just for sanity. + } + for _, fields := range row.Values { + for i, field := range fields { + switch row.Columns[i] { + case "trace_id": + id, err := ParseID(field.(string)) + if err != nil { + panic(err) // never happens, just for sanity. + } + results[i].Slowest = append(results[i].Slowest, id) + } + } + } + } + return results, nil +} + +func (in *InfluxDBStore) Traces(opts TracesOpts) ([]*Trace, error) { traces := make([]*Trace, 0) - rootSpansQuery := fmt.Sprintf("SELECT * FROM spans WHERE parent_id='%s' LIMIT %d", zeroID, in.tracesPerPage) + rootSpansQuery := fmt.Sprintf("SELECT * FROM spans WHERE parent_id='%s'", zeroID) + + // Extends `rootSpansQuery` to add time range filter using the start/end values from `opts.Timespan`. + if opts.Timespan != (Timespan{}) { + start := opts.Timespan.S.UTC().Format(time.RFC3339Nano) + end := opts.Timespan.E.UTC().Format(time.RFC3339Nano) + rootSpansQuery += fmt.Sprintf(" AND time >= '%s' AND time <= '%s'", start, end) + } + + // Extends `rootSpansQuery` to add a filter to include only those traces present in `opts.TraceIDs`. + traceIDsLen := len(opts.TraceIDs) + if traceIDsLen > 0 { + rootSpansQuery += " AND (" + for i, traceID := range opts.TraceIDs { + rootSpansQuery += fmt.Sprintf("trace_id = '%s'", traceID) + lastIteration := (i+1 == traceIDsLen) + if !lastIteration { + rootSpansQuery += " OR " + } + } + rootSpansQuery += ")" + } else { // Otherwise continue limiting the number of traces to be returned. + rootSpansQuery += fmt.Sprintf(" LIMIT %d", in.tracesPerPage) + } + rootSpansResult, err := in.executeOneQuery(rootSpansQuery) if err != nil { return nil, err @@ -574,6 +726,8 @@ func spansFromRow(row influxDBModels.Row) ([]*Span, error) { // Checks if current column is some span's ID, if so set to the span & continue with next field. switch column { + case "name", "duration": + continue // aggregation case "trace_id": traceID, err := fieldToSpanID(field, errFieldType) if err != nil { diff --git a/influxdb_store_test.go b/influxdb_store_test.go index 47f34b02..e677d209 100644 --- a/influxdb_store_test.go +++ b/influxdb_store_test.go @@ -265,7 +265,7 @@ func TestInfluxDBStore(t *testing.T) { } // InfluxDBStore.Traces(...) tests. - gotTraces, err := store.Traces() + gotTraces, err := store.Traces(TracesOpts{}) if err != nil { t.Fatalf("unexpected error: %+v", err) } @@ -431,7 +431,7 @@ func benchmarkInfluxDBStoreTraces(b *testing.B, n int) { } b.StartTimer() for n := 0; n < b.N; n++ { - if _, err := store.Traces(); err != nil { + if _, err := store.Traces(TracesOpts{}); err != nil { b.Fatal(err) } } diff --git a/multi.go b/multi.go index 4c0961e6..d3dd791a 100644 --- a/multi.go +++ b/multi.go @@ -53,13 +53,13 @@ type multiQueryer struct { // // It panics if any underlying store does not implement the appdash Queryer // interface. -func (mq *multiQueryer) Traces() ([]*Trace, error) { +func (mq *multiQueryer) Traces(opts TracesOpts) ([]*Trace, error) { var ( union = make(map[ID]struct{}) all []*Trace ) for _, q := range mq.queryers { - traces, err := q.Traces() + traces, err := q.Traces(TracesOpts{}) if err != nil { return nil, err } diff --git a/store.go b/store.go index 884c4710..134fa7c3 100644 --- a/store.go +++ b/store.go @@ -26,12 +26,53 @@ var ( ErrTraceNotFound = errors.New("trace not found") ) +// TraceOpts bundles the options used for list of traces. +type TracesOpts struct { + // Timespan specifies a time range values which can be used as input for filtering traces. + Timespan Timespan + + // TraceIDs filters the returned traces to just the ones with the given IDs. + TraceIDs []ID +} + // A Queryer indexes spans and makes them queryable. type Queryer interface { - // Traces returns an implementation-defined list of traces. It is - // a placeholder method that will be removed when other, more - // useful methods are added to Queryer. - Traces() ([]*Trace, error) + // Traces returns an implementation-defined list of traces according to the options. + Traces(opts TracesOpts) ([]*Trace, error) +} + +// AggregatedResult represents a set of traces that were aggregated together by +// root span name to produce some useful metrics (average trace time, minimum +// time, a link to the slowest traces, etc). +type AggregatedResult struct { + // RootSpanName is the name of the root span of the traces that were + // aggregated to form this result. + RootSpanName string + + // Average, Minimum, Maximum, and standard deviation of the total trace + // times (earliest span start time, latest span end time) of all traces + // that were aggregated to produce this result, respectively. + Average, Min, Max, StdDev time.Duration + + // Samples is the number of traces that were sampled in order to produce + // this result. + Samples int64 + + // Slowest is the N-slowest trace IDs that were part of this group, such + // that these are the most valuable/slowest traces for inspection. + Slowest []ID +} + +// Aggregator is a type of store that can aggregate its trace data and return +// results about it. +type Aggregator interface { + // Aggregate should return the aggregated data for all traces within the + // past 72/hr, such that: + // + // Aggregate(-72 * time.Hour, 0) + // + // would return all possible results. + Aggregate(start, end time.Duration) ([]*AggregatedResult, error) } // NewMemoryStore creates a new in-memory store @@ -215,7 +256,7 @@ func (ms *MemoryStore) traceNoLock(id ID) (*Trace, error) { } // Traces implements the Queryer interface. -func (ms *MemoryStore) Traces() ([]*Trace, error) { +func (ms *MemoryStore) Traces(opts TracesOpts) ([]*Trace, error) { ms.Lock() defer ms.Unlock() diff --git a/store_test.go b/store_test.go index 0b1f06d6..bcf083c4 100644 --- a/store_test.go +++ b/store_test.go @@ -350,7 +350,7 @@ func TestRecentStore(t *testing.T) { rs.MustCollect(SpanID{1, 2, 3}) rs.MustCollect(SpanID{2, 3, 4}) - traces, _ := ms.Traces() + traces, _ := ms.Traces(TracesOpts{}) if len(traces) != 2 { t.Errorf("got traces %v, want %d total", traces, 2) } @@ -358,7 +358,7 @@ func TestRecentStore(t *testing.T) { time.Sleep(2 * age) rs.MustCollect(SpanID{3, 4, 5}) time.Sleep(2 * age) - traces, _ = ms.Traces() + traces, _ = ms.Traces(TracesOpts{}) if len(traces) != 1 { t.Errorf("got traces %v, want %d total", traces, 1) } @@ -373,30 +373,30 @@ func TestLimitStore(t *testing.T) { ms := NewMemoryStore() rs := &storeT{t, &LimitStore{DeleteStore: ms, Max: 2}} - if traces, _ := ms.Traces(); len(traces) != 0 { + if traces, _ := ms.Traces(TracesOpts{}); len(traces) != 0 { t.Errorf("got traces %v, want %d total", traces, 0) } rs.MustCollect(SpanID{1, 2, 3}) - if traces, _ := ms.Traces(); len(traces) != 1 { + if traces, _ := ms.Traces(TracesOpts{}); len(traces) != 1 { t.Errorf("got traces %v, want %d total", traces, 1) } rs.MustCollect(SpanID{2, 3, 4}) - if traces, _ := ms.Traces(); len(traces) != 2 { + if traces, _ := ms.Traces(TracesOpts{}); len(traces) != 2 { t.Errorf("got traces %v, want %d total", traces, 2) } rs.MustCollect(SpanID{3, 4, 5}) rs.MustCollect(SpanID{3, 5, 6}) - if traces, _ := ms.Traces(); len(traces) != 2 { + if traces, _ := ms.Traces(TracesOpts{}); len(traces) != 2 { t.Errorf("got traces %v, want %d total", traces, 2) } - traces, _ := ms.Traces() + traces, _ := ms.Traces(TracesOpts{}) want := []*Trace{ {Span: Span{ID: SpanID{2, 3, 4}}}, { diff --git a/trace.go b/trace.go index 89f64e3a..d9cd2039 100644 --- a/trace.go +++ b/trace.go @@ -3,9 +3,11 @@ package appdash import ( "bytes" "encoding/json" + "errors" "fmt" "io" "strings" + "time" ) // A Trace is a tree of spans. @@ -45,59 +47,16 @@ func (t *Trace) TreeString() string { return buf.String() } -// IsAggregate tells if the trace contains any AggregateEvents (it is therefor -// said to be a set of aggregated traces). -func (t *Trace) IsAggregate() bool { - aggSchema := schemaPrefix + AggregateEvent{}.Schema() - var walk func(t *Trace) bool - walk = func(t *Trace) bool { - for _, ann := range t.Annotations { - if ann.Key == aggSchema { - return true - } - } - for _, sub := range t.Sub { - if walk(sub) { - return true - } - } - return false - } - return walk(t) -} - -// Aggregated returns the aggregate event (or nil if none is found) along with -// all of the TimespanEvents found in this trace. -func (t *Trace) Aggregated() (*AggregateEvent, []TimespanEvent, error) { - var ( - agg *AggregateEvent - timespans []TimespanEvent - walk func(t *Trace) error - ) - walk = func(t *Trace) error { - var evs []Event - err := UnmarshalEvents(t.Annotations, &evs) - if err != nil { - return err - } - for _, ev := range evs { - if a, ok := ev.(AggregateEvent); ok { - agg = &a - } else if t, ok := ev.(TimespanEvent); ok { - timespans = append(timespans, t) - } - } - for _, sub := range t.Sub { - if err := walk(sub); err != nil { - return err - } - } - return nil +func (t *Trace) TimespanEvent() (TimespanEvent, error) { + var events []Event + if err := UnmarshalEvents(t.Annotations, &events); err != nil { + return timespanEvent{}, err } - if err := walk(t); err != nil { - return nil, nil, err + start, end, ok := findTraceTimes(events) + if !ok { + return timespanEvent{}, errors.New("time span event not found") } - return agg, timespans, nil + return timespanEvent{S: start, E: end}, nil } func (t *Trace) treeString(w io.Writer, depth int) { @@ -131,6 +90,40 @@ func (t *Trace) treeString(w io.Writer, depth int) { } } +// findTraceTimes finds the minimum and maximum timespan event times for the +// given set of events, or returns ok == false if there are no such events. +func findTraceTimes(events []Event) (start, end time.Time, ok bool) { + // Find the start and end time of the trace. + var ( + eStart, eEnd time.Time + haveTimes = false + ) + for _, e := range events { + e, ok := e.(TimespanEvent) + if !ok { + continue + } + if !haveTimes { + haveTimes = true + eStart = e.Start() + eEnd = e.End() + continue + } + if v := e.Start(); v.UnixNano() < eStart.UnixNano() { + eStart = v + } + if v := e.End(); v.UnixNano() > eEnd.UnixNano() { + eEnd = v + } + } + if !haveTimes { + // We didn't find any timespan events at all, so we're done here. + ok = false + return + } + return eStart, eEnd, true +} + type tracesByIDSpan []*Trace func (t tracesByIDSpan) Len() int { return len(t) } diff --git a/traceapp/app.go b/traceapp/app.go index 1bbbb7c3..70a83d70 100644 --- a/traceapp/app.go +++ b/traceapp/app.go @@ -21,7 +21,6 @@ import ( "net/url" "os" "path" - "sort" "strings" "sync" @@ -36,8 +35,9 @@ import ( type App struct { *Router - Store appdash.Store - Queryer appdash.Queryer + Store appdash.Store + Queryer appdash.Queryer + Aggregator appdash.Aggregator tmplLock sync.Mutex tmpls map[string]*htmpl.Template @@ -153,11 +153,6 @@ func (a *App) serveTrace(w http.ResponseWriter, r *http.Request) error { } func (a *App) serveTraces(w http.ResponseWriter, r *http.Request) error { - traces, err := a.Queryer.Traces() - if err != nil { - return err - } - // Parse the query for a comma-separated list of traces that we should only // show (all others are hidden). var showJust []appdash.ID @@ -170,10 +165,12 @@ func (a *App) serveTraces(w http.ResponseWriter, r *http.Request) error { } } - // Sort the traces by ID to ensure that the display order doesn't change upon - // multiple page reloads if Queryer.Traces is e.g. backed by a map (which has - // a random iteration order). - sort.Sort(tracesByID(traces)) + traces, err := a.Queryer.Traces(appdash.TracesOpts{ + TraceIDs: showJust, + }) + if err != nil { + return err + } return a.renderTemplate(w, r, "traces.html", http.StatusOK, &struct { TemplateCommon @@ -182,21 +179,6 @@ func (a *App) serveTraces(w http.ResponseWriter, r *http.Request) error { }{ Traces: traces, Visible: func(t *appdash.Trace) bool { - // Hide the traces that contain aggregate events (that's all they have, so - // they are not very useful to users). - if t.IsAggregate() { - return false - } - - if len(showJust) > 0 { - // Showing just a few traces. - for _, id := range showJust { - if id == t.Span.ID.Trace { - return true - } - } - return false - } return true }, }) @@ -204,7 +186,7 @@ func (a *App) serveTraces(w http.ResponseWriter, r *http.Request) error { func (a *App) serveAggregate(w http.ResponseWriter, r *http.Request) error { // By default we display all traces. - traces, err := a.Queryer.Traces() + traces, err := a.Queryer.Traces(appdash.TracesOpts{}) if err != nil { return err } diff --git a/traceapp/dashboard.go b/traceapp/dashboard.go index 3c69b7dc..9cb92836 100644 --- a/traceapp/dashboard.go +++ b/traceapp/dashboard.go @@ -4,14 +4,10 @@ import ( "bytes" "encoding/json" "io" - "math/big" "net/http" "strconv" "strings" "time" - - "github.com/cznic/mathutil" - "sourcegraph.com/sourcegraph/appdash" ) // dashboardRow represents a single row in the dashboard. It is encoded to JSON. @@ -22,75 +18,6 @@ type dashboardRow struct { URL string } -// newDashboardRow returns a new dashboard row with it's items calculated from -// the given aggregation event and timespan events (the returned row represents -// the whole aggregation event). -// -// The returned row does not have the URL field set. -func newDashboardRow(a appdash.AggregateEvent, timespans []appdash.TimespanEvent) dashboardRow { - row := dashboardRow{ - Name: a.Name, - Timespans: len(timespans), - } - - // Calculate sum and mean (row.Average), while determining min/max. - sum := big.NewInt(0) - for _, ts := range timespans { - d := ts.End().Sub(ts.Start()) - sum.Add(sum, big.NewInt(int64(d))) - if row.Min == 0 || d < row.Min { - row.Min = d - } - if row.Max == 0 || d > row.Max { - row.Max = d - } - } - avg := big.NewInt(0).Div(sum, big.NewInt(int64(len(timespans)))) - row.Average = time.Duration(avg.Int64()) - - // Calculate std. deviation. - sqDiffSum := big.NewInt(0) - for _, ts := range timespans { - d := ts.End().Sub(ts.Start()) - diff := big.NewInt(0).Sub(big.NewInt(int64(d)), avg) - sqDiffSum.Add(sqDiffSum, diff.Mul(diff, diff)) - } - stdDev := big.NewInt(0).Div(sqDiffSum, big.NewInt(int64(len(timespans)))) - stdDev = mathutil.SqrtBig(stdDev) - row.StdDev = time.Duration(stdDev.Int64()) - - // TODO(slimsag): if we can make the table display the data as formatted by - // Go (row.Average.String), we'll get much nicer display. But it means we'll - // need to perform custom sorting on the table (it will think "9ms" > "1m", - // for example). - - // Divide into milliseconds. - row.Average = row.Average / time.Millisecond - row.Min = row.Min / time.Millisecond - row.Max = row.Max / time.Millisecond - row.StdDev = row.StdDev / time.Millisecond - return row -} - -// aggTimeFilter removes timespans and slowest-trace IDs from the given -// aggregate event if they were not defined inside the given start and end time. -func aggTimeFilter(a appdash.AggregateEvent, timespans []appdash.TimespanEvent, start, end time.Time) (appdash.AggregateEvent, []appdash.TimespanEvent, bool) { - cpy := a - cpy.Slowest = nil - var cpyTimes []appdash.TimespanEvent - for n, ts := range timespans { - if ts.Start().UnixNano() < start.UnixNano() || ts.End().UnixNano() > end.UnixNano() { - // It started before or after the time period we want. - continue - } - cpyTimes = append(cpyTimes, ts) - if n < len(a.Slowest) { - cpy.Slowest = append(cpy.Slowest, a.Slowest[n]) - } - } - return cpy, cpyTimes, len(cpyTimes) > 0 -} - // serverDashboard serves the dashboard page. func (a *App) serveDashboard(w http.ResponseWriter, r *http.Request) error { uData, err := a.Router.URLTo(DashboardDataRoute) @@ -108,32 +35,31 @@ func (a *App) serveDashboard(w http.ResponseWriter, r *http.Request) error { // serveDashboardData serves the JSON data requested by the dashboards table. func (a *App) serveDashboardData(w http.ResponseWriter, r *http.Request) error { - traces, err := a.Queryer.Traces() - if err != nil { - return err - } - // Parse the query for the start & end timeline durations. var ( query = r.URL.Query() - start, end time.Time + start, end time.Duration ) - basis := time.Now().Add(-72 * time.Hour) if s := query.Get("start"); len(s) > 0 { v, err := strconv.ParseInt(s, 10, 64) if err != nil { return err } - // e.g. if (v)start==0, it'll be -72hrs ago - start = basis.Add(time.Duration(v) * time.Hour) + start = time.Duration(v) * time.Hour + start -= 72 * time.Hour } if s := query.Get("end"); len(s) > 0 { v, err := strconv.ParseInt(s, 10, 64) if err != nil { return err } - // .eg. if (v)end==72, it'll be time.Now() - end = basis.Add(time.Duration(v) * time.Hour) + end = time.Duration(v) * time.Hour + end -= 72 * time.Hour + } + + results, err := a.Aggregator.Aggregate(start, end) + if err != nil { + return err } // Grab the URL to the traces page. @@ -142,39 +68,23 @@ func (a *App) serveDashboardData(w http.ResponseWriter, r *http.Request) error { return err } - // Important: If it is a nil slice it will be encoded to JSON as null, and the - // bootstrap-table library will not update the table with "no entries". - rows := make([]dashboardRow, 0) - - // Produce the rows of data. - for _, trace := range traces { - // Grab the aggregation event from the trace, if any. - agg, timespans, err := trace.Aggregated() - if err != nil { - return err - } - if agg == nil { - continue // No aggregation event. - } - - // Filter the event by our timeline. - a, timespans, any := aggTimeFilter(*agg, timespans, start, end) - if !any { - continue - } - - // Create a list of slowest trace IDs (but as strings), then produce a - // URL which will query for it. + rows := make([]*dashboardRow, len(results)) + for i, r := range results { var stringIDs []string - for _, slowest := range a.Slowest { + for _, slowest := range r.Slowest { stringIDs = append(stringIDs, slowest.String()) } tracesURL.RawQuery = "show=" + strings.Join(stringIDs, ",") - // Create the row of data. - row := newDashboardRow(a, timespans) - row.URL = tracesURL.String() - rows = append(rows, row) + rows[i] = &dashboardRow{ + Name: r.RootSpanName, + Average: r.Average / time.Millisecond, + Min: r.Min / time.Millisecond, + Max: r.Max / time.Millisecond, + StdDev: r.StdDev / time.Millisecond, + Timespans: int(r.Samples), + URL: tracesURL.String(), + } } // Encode to JSON.