From 639d71153a14916878b1083e992c2308f7255dff Mon Sep 17 00:00:00 2001 From: Michael Burman Date: Fri, 15 Nov 2019 10:41:38 +0200 Subject: [PATCH 1/3] Fix order of the returned results from badger backend. Add additional tests to verify this as well as add the ability to request StarTime filtering for only TraceIDs. Signed-off-by: Michael Burman --- .../badger/spanstore/read_write_test.go | 47 ++- plugin/storage/badger/spanstore/reader.go | 354 +++++++++--------- .../badger/spanstore/rw_internal_test.go | 44 +-- 3 files changed, 229 insertions(+), 216 deletions(-) diff --git a/plugin/storage/badger/spanstore/read_write_test.go b/plugin/storage/badger/spanstore/read_write_test.go index d6422edc6ef..b2a734acb06 100644 --- a/plugin/storage/badger/spanstore/read_write_test.go +++ b/plugin/storage/badger/spanstore/read_write_test.go @@ -20,6 +20,7 @@ import ( "io" "io/ioutil" "log" + "math/rand" "os" "runtime/pprof" "testing" @@ -128,11 +129,6 @@ func TestValidation(t *testing.T) { params.Tags = map[string]string{"A": "B"} _, err = sr.FindTraces(context.Background(), params) assert.EqualError(t, err, "service name must be set") - - // Only StartTimeMin and Max (not supported yet) - // _, err := sr.FindTraces(context.Background(), params) - // assert.EqualError(t, err, "This query parameter is not supported yet") - }) } @@ -143,15 +139,16 @@ func TestIndexSeeks(t *testing.T) { spans := 3 tid := startT for i := 0; i < traces; i++ { + lowId := rand.Uint64() tid = tid.Add(time.Duration(time.Millisecond * time.Duration(i))) for j := 0; j < spans; j++ { s := model.Span{ TraceID: model.TraceID{ - Low: uint64(i), + Low: lowId, High: 1, }, - SpanID: model.SpanID(j), + SpanID: model.SpanID(rand.Uint64()), OperationName: fmt.Sprintf("operation-%d", j), Process: &model.Process{ ServiceName: fmt.Sprintf("service-%d", i%4), @@ -171,11 +168,19 @@ func TestIndexSeeks(t *testing.T) { }, }, } + err := sw.WriteSpan(&s) assert.NoError(t, err) } } + testOrder := func(trs []*model.Trace) { + // Assert that we returned correctly in DESC time order + for l := 1; l < len(trs); l++ { + assert.True(t, trs[l].Spans[spans-1].StartTime.Before(trs[l-1].Spans[spans-1].StartTime)) + } + } + params := &spanstore.TraceQueryParameters{ StartTimeMin: startT, StartTimeMax: startT.Add(time.Duration(time.Millisecond * 10)), @@ -185,6 +190,7 @@ func TestIndexSeeks(t *testing.T) { trs, err := sr.FindTraces(context.Background(), params) assert.NoError(t, err) assert.Equal(t, 1, len(trs)) + assert.Equal(t, spans, len(trs[0].Spans)) params.OperationName = "operation-1" trs, err = sr.FindTraces(context.Background(), params) @@ -215,6 +221,7 @@ func TestIndexSeeks(t *testing.T) { trs, err = sr.FindTraces(context.Background(), params) assert.NoError(t, err) assert.Equal(t, 1, len(trs)) + assert.Equal(t, spans, len(trs[0].Spans)) // Query limited amount of hits @@ -224,17 +231,24 @@ func TestIndexSeeks(t *testing.T) { trs, err = sr.FindTraces(context.Background(), params) assert.NoError(t, err) assert.Equal(t, 2, len(trs)) + testOrder(trs) - // Check for DESC return order + // Check for DESC return order with duration index params.NumTraces = 9 + params.DurationMin = time.Duration(30 * time.Millisecond) // Filters one + params.DurationMax = time.Duration(50 * time.Millisecond) // Filters three trs, err = sr.FindTraces(context.Background(), params) assert.NoError(t, err) - assert.Equal(t, 9, len(trs)) + assert.Equal(t, 5, len(trs)) + testOrder(trs) - // Assert that we fetched correctly in DESC time order - for l := 1; l < len(trs); l++ { - assert.True(t, trs[l].Spans[spans-1].StartTime.Before(trs[l-1].Spans[spans-1].StartTime)) - } + // Check for DESC return order without + params.DurationMin = 0 + params.DurationMax = 0 + trs, err = sr.FindTraces(context.Background(), params) + assert.NoError(t, err) + assert.Equal(t, 9, len(trs)) + testOrder(trs) // StartTime, endTime scan - full table scan (so technically no index seek) params = &spanstore.TraceQueryParameters{ @@ -244,7 +258,9 @@ func TestIndexSeeks(t *testing.T) { trs, err = sr.FindTraces(context.Background(), params) assert.NoError(t, err) - assert.Equal(t, 6, len(trs)) + assert.Equal(t, 5, len(trs)) + assert.Equal(t, spans, len(trs[0].Spans)) + testOrder(trs) // StartTime and Duration queries params.StartTimeMax = startT.Add(time.Duration(time.Hour * 10)) @@ -254,8 +270,7 @@ func TestIndexSeeks(t *testing.T) { trs, err = sr.FindTraces(context.Background(), params) assert.NoError(t, err) assert.Equal(t, 6, len(trs)) - assert.Equal(t, uint64(56), trs[0].Spans[0].TraceID.Low) - assert.Equal(t, uint64(51), trs[5].Spans[0].TraceID.Low) + testOrder(trs) }) } diff --git a/plugin/storage/badger/spanstore/reader.go b/plugin/storage/badger/spanstore/reader.go index a329e98e6f3..73a4146899f 100644 --- a/plugin/storage/badger/spanstore/reader.go +++ b/plugin/storage/badger/spanstore/reader.go @@ -23,7 +23,6 @@ import ( "fmt" "math" "sort" - "time" "github.com/dgraph-io/badger" "github.com/golang/protobuf/proto" @@ -69,6 +68,20 @@ type TraceReader struct { cache *CacheStore } +// executionPlan is internal structure to track the index filtering +type executionPlan struct { + startTimeMin []byte + startTimeMax []byte + + limit int + + // mergeOuter is the result of merge-join of inner and outer result sets + mergeOuter [][]byte + + // hashOuter is the hashmap for hash-join of outer resultset + hashOuter map[model.TraceID]struct{} +} + // NewTraceReader returns a TraceReader with cache func NewTraceReader(db *badger.DB, c *CacheStore) *TraceReader { return &TraceReader{ @@ -155,21 +168,16 @@ func (r *TraceReader) GetTrace(ctx context.Context, traceID model.TraceID) (*mod } // scanTimeRange returns all the Traces found between startTs and endTs -func (r *TraceReader) scanTimeRange(startTime time.Time, endTime time.Time) ([]*model.Trace, error) { +func (r *TraceReader) scanTimeRange(plan *executionPlan) ([]model.TraceID, error) { // We need to do a full table scan - traces := make([]*model.Trace, 0) - startTs := model.TimeAsEpochMicroseconds(startTime) - endTs := model.TimeAsEpochMicroseconds(endTime) - + traceKeys := make([][]byte, 0) err := r.store.View(func(txn *badger.Txn) error { opts := badger.DefaultIteratorOptions - opts.PrefetchValues = true + opts.PrefetchValues = false it := txn.NewIterator(opts) defer it.Close() - val := []byte{} startIndex := []byte{spanKeyPrefix} - spans := make([]*model.Span, 0) prevTraceID := []byte{} for it.Seek(startIndex); it.ValidForPrefix(startIndex); it.Next() { item := it.Item() @@ -177,47 +185,38 @@ func (r *TraceReader) scanTimeRange(startTime time.Time, endTime time.Time) ([]* key := []byte{} key = item.KeyCopy(key) - timestamp := binary.BigEndian.Uint64(key[sizeOfTraceID+1 : sizeOfTraceID+1+8]) + timestamp := key[sizeOfTraceID+1 : sizeOfTraceID+1+8] traceID := key[1 : sizeOfTraceID+1] - if timestamp >= startTs && timestamp <= endTs { - val, err := item.ValueCopy(val) - if err != nil { - return err - } - - sp, err := decodeValue(val, item.UserMeta()&encodingTypeBits) - if err != nil { - return err - } - - if bytes.Equal(prevTraceID, traceID) { - // Still processing the same one - spans = append(spans, sp) - } else { - // Process last complete span - trace := &model.Trace{ - Spans: spans, + if bytes.Compare(timestamp, plan.startTimeMin) >= 0 && bytes.Compare(timestamp, plan.startTimeMax) <= 0 { + if !bytes.Equal(traceID, prevTraceID) { + if plan.hashOuter != nil { + trID := bytesToTraceID(traceID) + if _, exists := plan.hashOuter[trID]; exists { + traceKeys = append(traceKeys, key) + } + } else { + traceKeys = append(traceKeys, key) } - traces = append(traces, trace) - - spans = make([]*model.Span, 0, cap(spans)) // Use previous cap - spans = append(spans, sp) + prevTraceID = traceID } - prevTraceID = traceID } } - if len(spans) > 0 { - trace := &model.Trace{ - Spans: spans, - } - traces = append(traces, trace) - } return nil }) - return traces, err + sort.Slice(traceKeys, func(k, h int) bool { + return bytes.Compare(traceKeys[k][sizeOfTraceID+1:sizeOfTraceID+1+8], traceKeys[h][sizeOfTraceID+1:sizeOfTraceID+1+8]) > 0 + }) + + traceIDs := make([]model.TraceID, len(traceKeys)) + + for i := 0; i < len(traceKeys); i++ { + traceIDs[i] = bytesToTraceID(traceKeys[i][1 : sizeOfTraceID+1]) + } + + return traceIDs, err } func createPrimaryKeySeekPrefix(traceID model.TraceID) []byte { @@ -243,7 +242,7 @@ func (r *TraceReader) GetOperations(ctx context.Context, service string) ([]stri // setQueryDefaults alters the query with defaults if certain parameters are not set func setQueryDefaults(query *spanstore.TraceQueryParameters) { - if query.NumTraces == 0 { + if query.NumTraces <= 0 { query.NumTraces = defaultNumTraces } } @@ -275,81 +274,138 @@ func serviceQueries(query *spanstore.TraceQueryParameters, indexSeeks [][]byte) } // indexSeeksToTraceIDs does the index scanning against badger based on the parsed index queries -func (r *TraceReader) indexSeeksToTraceIDs(query *spanstore.TraceQueryParameters, indexSeeks [][]byte, ids [][][]byte) ([][][]byte, error) { - for i, s := range indexSeeks { - indexResults, err := r.scanIndexKeys(s, query.StartTimeMin, query.StartTimeMax) +func (r *TraceReader) indexSeeksToTraceIDs(plan *executionPlan, indexSeeks [][]byte) ([]model.TraceID, error) { + + for i := len(indexSeeks) - 1; i > 0; i-- { + indexResults, err := r.scanIndexKeys(indexSeeks[i], plan) if err != nil { return nil, err } + sort.Slice(indexResults, func(k, h int) bool { + return bytes.Compare(indexResults[k], indexResults[h]) < 0 + }) + // Same traceID can be returned multiple times, but always in sorted order so checking the previous key is enough prevTraceID := []byte{} - ids = append(ids, make([][]byte, 0, len(indexResults))) - for _, k := range indexResults { - traceID := k[len(k)-sizeOfTraceID:] + innerIDs := make([][]byte, 0, len(indexSeeks)) + for j := 0; j < len(indexResults); j++ { + traceID := indexResults[j] if !bytes.Equal(prevTraceID, traceID) { - ids[i] = append(ids[i], traceID) + innerIDs = append(innerIDs, traceID) prevTraceID = traceID } } - sort.Slice(ids[i], func(k, h int) bool { - return bytes.Compare(ids[i][k], ids[i][h]) < 0 - }) + + // Merge-join current results + if plan.mergeOuter == nil { + plan.mergeOuter = innerIDs + } else { + plan.mergeOuter = mergeJoinIds(plan.mergeOuter, innerIDs) + } + } + + // Last scan should get us in correct timestamp order + ids, err := r.scanIndexKeys(indexSeeks[0], plan) + if err != nil { + return nil, err + } + + if plan.mergeOuter != nil { + // Build hash of the current merged data + plan.hashOuter = buildHash(plan, plan.mergeOuter) + plan.mergeOuter = nil + } else { + // We filter the last elements + plan.hashOuter = buildHash(plan, ids) + } + + traceIDs := filterIDs(plan, ids) + return traceIDs, nil +} + +func filterIDs(plan *executionPlan, innerIDs [][]byte) []model.TraceID { + traces := make([]model.TraceID, 0, plan.limit) + + items := 0 + for i := 0; i < len(innerIDs); i++ { + trID := bytesToTraceID(innerIDs[i]) + + if _, found := plan.hashOuter[trID]; found { + traces = append(traces, trID) + delete(plan.hashOuter, trID) // Prevent duplicate add + items++ + } + + if items == plan.limit { + return traces + } + } + + return traces +} + +func bytesToTraceID(key []byte) model.TraceID { + return model.TraceID{ + High: binary.BigEndian.Uint64(key[:8]), + Low: binary.BigEndian.Uint64(key[8:sizeOfTraceID]), } - return ids, nil } -// durationQueries checks non unique index of durations -func (r *TraceReader) durationQueries(query *spanstore.TraceQueryParameters, ids [][][]byte) [][][]byte { +func buildHash(plan *executionPlan, outerIDs [][]byte) map[model.TraceID]struct{} { + var empty struct{} + + hashed := make(map[model.TraceID]struct{}) + for i := 0; i < len(outerIDs); i++ { + trID := bytesToTraceID(outerIDs[i]) + + if plan.hashOuter != nil { + if _, exists := plan.hashOuter[trID]; exists { + hashed[trID] = empty + delete(plan.hashOuter, trID) // Filter duplications + } + } else { + hashed[trID] = empty + } + } + + return hashed +} + +// durationQueries checks non unique index of durations and returns a map for further filtering purposes +func (r *TraceReader) durationQueries(plan *executionPlan, query *spanstore.TraceQueryParameters) map[model.TraceID]struct{} { durMax := uint64(model.DurationAsMicroseconds(query.DurationMax)) durMin := uint64(model.DurationAsMicroseconds(query.DurationMin)) - startKey := make([]byte, 0, 9) - endKey := make([]byte, 0, 9) + startKey := make([]byte, 9) + endKey := make([]byte, 9) - startKey = append(startKey, durationIndexKey) // [0] = - endKey = append(endKey, durationIndexKey) + startKey[0] = durationIndexKey + endKey[0] = durationIndexKey - endVal := make([]byte, 8) if query.DurationMax == 0 { // Set MAX to infinite, if Min is missing, 0 is a fine search result for us durMax = math.MaxUint64 } - binary.BigEndian.PutUint64(endVal, durMax) - - startVal := make([]byte, 8) - binary.BigEndian.PutUint64(startVal, durMin) - - startKey = append(startKey, startVal...) - endKey = append(endKey, endVal...) + binary.BigEndian.PutUint64(endKey[1:], durMax) + binary.BigEndian.PutUint64(startKey[1:], durMin) // This is not unique index result - same TraceID can be matched from multiple spans - indexResults, _ := r.scanRangeIndex(startKey, endKey, query.StartTimeMin, query.StartTimeMax) - hashFilter := make(map[model.TraceID]struct{}, len(indexResults)) - filteredResults := make([]*model.TraceID, 0, len(indexResults)) // Max possible length - appendableResults := make([][]byte, 0, len(indexResults)) // Max possible length + indexResults, _ := r.scanRangeIndex(plan, startKey, endKey) + hashFilter := make(map[model.TraceID]struct{}) var value struct{} for _, k := range indexResults { key := k[len(k)-sizeOfTraceID:] - id := &model.TraceID{ + id := model.TraceID{ High: binary.BigEndian.Uint64(key[:8]), Low: binary.BigEndian.Uint64(key[8:]), } - if _, exists := hashFilter[*id]; !exists { - filteredResults = append(filteredResults, id) - hashFilter[*id] = value + if _, exists := hashFilter[id]; !exists { + hashFilter[id] = value } } - model.SortTraceIDs(filteredResults) - - // This is an ugly hack at this point - but has no impact on performance really - for _, tr := range filteredResults { - appendableResults = append(appendableResults, traceIDToComparableBytes(tr)) - } - - ids = append(ids, appendableResults) - return ids + return hashFilter } func mergeJoinIds(left, right [][]byte) [][]byte { @@ -382,51 +438,10 @@ func mergeJoinIds(left, right [][]byte) [][]byte { return merged } -// sortMergeIds does a sort-merge join operation to the list of TraceIDs to remove duplicates -func sortMergeIds(query *spanstore.TraceQueryParameters, ids [][][]byte) []model.TraceID { - // Key only scan is a lot faster in the badger - use sort-merge join algorithm instead of hash join since we have the keys in sorted order already - var merged [][]byte - - if len(ids) > 1 { - merged = mergeJoinIds(ids[0], ids[1]) - for i := 2; i < len(ids); i++ { - merged = mergeJoinIds(merged, ids[i]) - } - } else { - merged = ids[0] - } - - // Get top query.NumTraces results (order in DESC) - if query.NumTraces < len(merged) { - merged = merged[len(merged)-query.NumTraces:] - } - - // Results are in ASC (badger's default order), but Jaeger uses DESC, thus we need to reverse the array - for left, right := 0, len(merged)-1; left < right; left, right = left+1, right-1 { - merged[left], merged[right] = merged[right], merged[left] - } - - // Create the structs from [][]byte to TraceID - keys := make([]model.TraceID, 0, len(merged)) - - for _, key := range merged { - keys = append(keys, model.TraceID{ - High: binary.BigEndian.Uint64(key[:8]), - Low: binary.BigEndian.Uint64(key[8:]), - }) - } - - return keys -} - // FindTraces retrieves traces that match the traceQuery func (r *TraceReader) FindTraces(ctx context.Context, query *spanstore.TraceQueryParameters) ([]*model.Trace, error) { keys, err := r.FindTraceIDs(ctx, query) if err != nil { - if err == ErrNotSupported && (!query.StartTimeMax.IsZero() && !query.StartTimeMin.IsZero()) { - return r.scanTimeRange(query.StartTimeMin, query.StartTimeMax) - } - return nil, err } @@ -446,25 +461,32 @@ func (r *TraceReader) FindTraceIDs(ctx context.Context, query *spanstore.TraceQu indexSeeks := make([][]byte, 0, 1) indexSeeks = serviceQueries(query, indexSeeks) - ids := make([][][]byte, 0, len(indexSeeks)+1) - ids, err := r.indexSeeksToTraceIDs(query, indexSeeks, ids) - if err != nil { - return nil, err + startStampBytes := make([]byte, 8) + binary.BigEndian.PutUint64(startStampBytes, model.TimeAsEpochMicroseconds(query.StartTimeMin)) + + endStampBytes := make([]byte, 8) + binary.BigEndian.PutUint64(endStampBytes, model.TimeAsEpochMicroseconds(query.StartTimeMax)) + + plan := &executionPlan{ + startTimeMin: startStampBytes, + startTimeMax: endStampBytes, + limit: query.NumTraces, } - // Only secondary range index for now (StartTime filtering should be done using the PK) if query.DurationMax != 0 || query.DurationMin != 0 { - ids = r.durationQueries(query, ids) + plan.hashOuter = r.durationQueries(plan, query) } - // Transform index seeks (both unique indexes as well as non-unique indexes) to a list of TraceIDs without duplicates - if len(ids) > 0 { - keys := sortMergeIds(query, ids) + if len(indexSeeks) > 0 { + keys, err := r.indexSeeksToTraceIDs(plan, indexSeeks) + if err != nil { + return nil, err + } + return keys, nil } - // TODO We could support here all the other scans, such as time range only. These are not currently backed by an index, so a "full table scan" of traces is required. - return nil, ErrNotSupported + return r.scanTimeRange(plan) } // validateQuery returns an error if certain restrictions are not met @@ -475,15 +497,12 @@ func validateQuery(p *spanstore.TraceQueryParameters) error { if p.ServiceName == "" && len(p.Tags) > 0 { return ErrServiceNameNotSet } - if p.ServiceName == "" && p.OperationName != "" { return ErrServiceNameNotSet } - if p.StartTimeMin.IsZero() || p.StartTimeMax.IsZero() { return ErrStartAndEndTimeNotSet } - if !p.StartTimeMax.IsZero() && p.StartTimeMax.Before(p.StartTimeMin) { return ErrStartTimeMinGreaterThanMax } @@ -494,34 +513,34 @@ func validateQuery(p *spanstore.TraceQueryParameters) error { } // scanIndexKeys scans the time range for index keys matching the given prefix. -func (r *TraceReader) scanIndexKeys(indexKeyValue []byte, startTimeMin time.Time, startTimeMax time.Time) ([][]byte, error) { +func (r *TraceReader) scanIndexKeys(indexKeyValue []byte, plan *executionPlan) ([][]byte, error) { indexResults := make([][]byte, 0) - startStampBytes := make([]byte, 8) - binary.BigEndian.PutUint64(startStampBytes, model.TimeAsEpochMicroseconds(startTimeMin)) - err := r.store.View(func(txn *badger.Txn) error { opts := badger.DefaultIteratorOptions opts.PrefetchValues = false // Don't fetch values since we're only interested in the keys + opts.Reverse = true it := txn.NewIterator(opts) defer it.Close() // Create starting point for sorted index scan - startIndex := make([]byte, len(indexKeyValue)+len(startStampBytes)) + startIndex := make([]byte, len(indexKeyValue)+8+1) + startIndex[len(startIndex)-1] = 0xFF copy(startIndex, indexKeyValue) - copy(startIndex[len(indexKeyValue):], startStampBytes) + copy(startIndex[len(indexKeyValue):], plan.startTimeMax) - timeMax := model.TimeAsEpochMicroseconds(startTimeMax) - for it.Seek(startIndex); scanFunction(it, indexKeyValue, timeMax); it.Next() { + for it.Seek(startIndex); scanFunction(it, indexKeyValue, plan.startTimeMin); it.Next() { item := it.Item() // ScanFunction is a prefix scanning (since we could have for example service1 & service12) // Now we need to match only the exact key if we want to add it timestampStartIndex := len(it.Item().Key()) - (sizeOfTraceID + 8) // timestamp is stored with 8 bytes if bytes.Equal(indexKeyValue, it.Item().Key()[:timestampStartIndex]) { - key := make([]byte, len(item.Key())) - copy(key, item.Key()) - indexResults = append(indexResults, key) + traceIDBytes := item.Key()[len(item.Key())-sizeOfTraceID:] + + traceIDCopy := make([]byte, sizeOfTraceID) + copy(traceIDCopy, traceIDBytes) + indexResults = append(indexResults, traceIDCopy) } } return nil @@ -531,24 +550,27 @@ func (r *TraceReader) scanIndexKeys(indexKeyValue []byte, startTimeMin time.Time } // scanFunction compares the index name as well as the time range in the index key -func scanFunction(it *badger.Iterator, indexPrefix []byte, timeIndexEnd uint64) bool { +func scanFunction(it *badger.Iterator, indexPrefix []byte, timeBytesEnd []byte) bool { if it.Item() != nil { // We can't use the indexPrefix length, because we might have the same prefixValue for non-matching cases also timestampStartIndex := len(it.Item().Key()) - (sizeOfTraceID + 8) // timestamp is stored with 8 bytes - timestamp := binary.BigEndian.Uint64(it.Item().Key()[timestampStartIndex : timestampStartIndex+8]) + timestamp := it.Item().Key()[timestampStartIndex : timestampStartIndex+8] + timestampInRange := bytes.Compare(timeBytesEnd, timestamp) <= 0 + + // Check length as well to prevent theoretical case where timestamp might match with wrong index key + if len(it.Item().Key()) != len(indexPrefix)+24 { + return false + } - return bytes.HasPrefix(it.Item().Key()[:timestampStartIndex], indexPrefix) && timestamp <= timeIndexEnd + return bytes.HasPrefix(it.Item().Key()[:timestampStartIndex], indexPrefix) && timestampInRange } return false } -// scanIndexKeys scans the time range for index keys matching the given prefix. -func (r *TraceReader) scanRangeIndex(indexStartValue []byte, indexEndValue []byte, startTimeMin time.Time, startTimeMax time.Time) ([][]byte, error) { +// scanRangeIndex scans the time range for index keys matching the given prefix. +func (r *TraceReader) scanRangeIndex(plan *executionPlan, indexStartValue []byte, indexEndValue []byte) ([][]byte, error) { indexResults := make([][]byte, 0) - startStampBytes := make([]byte, 8) - binary.BigEndian.PutUint64(startStampBytes, model.TimeAsEpochMicroseconds(startTimeMin)) - err := r.store.View(func(txn *badger.Txn) error { opts := badger.DefaultIteratorOptions opts.PrefetchValues = false // Don't fetch values since we're only interested in the keys @@ -556,11 +578,9 @@ func (r *TraceReader) scanRangeIndex(indexStartValue []byte, indexEndValue []byt defer it.Close() // Create starting point for sorted index scan - startIndex := make([]byte, len(indexStartValue)+len(startStampBytes)) + startIndex := make([]byte, len(indexStartValue)+len(plan.startTimeMin)) copy(startIndex, indexStartValue) - copy(startIndex[len(indexStartValue):], startStampBytes) - - timeIndexEnd := model.TimeAsEpochMicroseconds(startTimeMax) + copy(startIndex[len(indexStartValue):], plan.startTimeMin) for it.Seek(startIndex); scanRangeFunction(it, indexEndValue); it.Next() { item := it.Item() @@ -568,8 +588,8 @@ func (r *TraceReader) scanRangeIndex(indexStartValue []byte, indexEndValue []byt // ScanFunction is a prefix scanning (since we could have for example service1 & service12) // Now we need to match only the exact key if we want to add it timestampStartIndex := len(it.Item().Key()) - (sizeOfTraceID + 8) // timestamp is stored with 8 bytes - timestamp := binary.BigEndian.Uint64(it.Item().Key()[timestampStartIndex : timestampStartIndex+8]) - if timestamp <= timeIndexEnd { + timestamp := it.Item().Key()[timestampStartIndex : timestampStartIndex+8] + if bytes.Compare(timestamp, plan.startTimeMax) <= 0 { key := make([]byte, len(item.Key())) copy(key, item.Key()) indexResults = append(indexResults, key) @@ -588,11 +608,3 @@ func scanRangeFunction(it *badger.Iterator, indexEndValue []byte) bool { } return false } - -// traceIDToComparableBytes transforms model.TraceID to BigEndian sorted []byte -func traceIDToComparableBytes(traceID *model.TraceID) []byte { - traceIDBytes := make([]byte, sizeOfTraceID) - binary.BigEndian.PutUint64(traceIDBytes, traceID.High) - binary.BigEndian.PutUint64(traceIDBytes[8:], traceID.Low) - return traceIDBytes -} diff --git a/plugin/storage/badger/spanstore/rw_internal_test.go b/plugin/storage/badger/spanstore/rw_internal_test.go index fc119033158..f1b861bc88d 100644 --- a/plugin/storage/badger/spanstore/rw_internal_test.go +++ b/plugin/storage/badger/spanstore/rw_internal_test.go @@ -14,11 +14,9 @@ package spanstore import ( - "bytes" "context" "encoding/binary" "math/rand" - "sort" "testing" "time" @@ -100,48 +98,34 @@ func TestDecodeErrorReturns(t *testing.T) { assert.Error(t, err) } -func TestSortMergeIdsDuplicateDetection(t *testing.T) { - // Different IndexSeeks return the same results - ids := make([][][]byte, 2) - ids[0] = make([][]byte, 1) - ids[1] = make([][]byte, 1) - buf := new(bytes.Buffer) - binary.Write(buf, binary.BigEndian, uint64(0)) - binary.Write(buf, binary.BigEndian, uint64(156697987635)) - b := buf.Bytes() - ids[0][0] = b - ids[1][0] = b - - query := &spanstore.TraceQueryParameters{ - NumTraces: 64, - } - - traces := sortMergeIds(query, ids) - assert.Equal(t, 1, len(traces)) -} - func TestDuplicateTraceIDDetection(t *testing.T) { runWithBadger(t, func(store *badger.DB, t *testing.T) { testSpan := createDummySpan() cache := NewCacheStore(store, time.Duration(1*time.Hour), true) sw := NewSpanWriter(store, cache, time.Duration(1*time.Hour), nil) rw := NewTraceReader(store, cache) - - for i := 0; i < 8; i++ { - testSpan.SpanID = model.SpanID(i) - testSpan.StartTime = testSpan.StartTime.Add(time.Millisecond) - err := sw.WriteSpan(&testSpan) - assert.NoError(t, err) + origStartTime := testSpan.StartTime + + traceCount := 128 + for k := 0; k < traceCount; k++ { + testSpan.TraceID.Low = rand.Uint64() + for i := 0; i < 32; i++ { + testSpan.SpanID = model.SpanID(rand.Uint64()) + testSpan.StartTime = origStartTime.Add(time.Duration(rand.Int31n(8000)) * time.Millisecond) + err := sw.WriteSpan(&testSpan) + assert.NoError(t, err) + } } traces, err := rw.FindTraceIDs(context.Background(), &spanstore.TraceQueryParameters{ ServiceName: "service", + NumTraces: 128, StartTimeMax: time.Now().Add(time.Hour), StartTimeMin: testSpan.StartTime.Add(-1 * time.Hour), }) assert.NoError(t, err) - assert.Equal(t, 1, len(traces)) + assert.Equal(t, 128, len(traces)) }) } @@ -215,6 +199,7 @@ func TestMergeJoin(t *testing.T) { assert.Equal(uint32(2), binary.BigEndian.Uint32(merged[1])) } +/* func TestIndexScanReturnOrder(t *testing.T) { runWithBadger(t, func(store *badger.DB, t *testing.T) { testSpan := createDummySpan() @@ -250,3 +235,4 @@ func TestIndexScanReturnOrder(t *testing.T) { })) }) } +*/ From ac4109743a06c84b55e54477a958de7cdda98c9d Mon Sep 17 00:00:00 2001 From: Michael Burman Date: Thu, 21 Nov 2019 19:07:44 +0200 Subject: [PATCH 2/3] Address comments, add newest first check to the TestIndexSeek and fix the limit in scanTimeRange only scenario Signed-off-by: Michael Burman --- .../badger/spanstore/read_write_test.go | 36 ++++++++++++----- plugin/storage/badger/spanstore/reader.go | 13 ++++-- .../badger/spanstore/rw_internal_test.go | 40 +------------------ 3 files changed, 37 insertions(+), 52 deletions(-) diff --git a/plugin/storage/badger/spanstore/read_write_test.go b/plugin/storage/badger/spanstore/read_write_test.go index b2a734acb06..024d006f22b 100644 --- a/plugin/storage/badger/spanstore/read_write_test.go +++ b/plugin/storage/badger/spanstore/read_write_test.go @@ -138,8 +138,12 @@ func TestIndexSeeks(t *testing.T) { traces := 60 spans := 3 tid := startT + + traceOrder := make([]uint64, traces) + for i := 0; i < traces; i++ { lowId := rand.Uint64() + traceOrder[i] = lowId tid = tid.Add(time.Duration(time.Millisecond * time.Duration(i))) for j := 0; j < spans; j++ { @@ -217,7 +221,6 @@ func TestIndexSeeks(t *testing.T) { tags["error"] = "true" params.Tags = tags params.DurationMin = time.Duration(1 * time.Millisecond) - // params.DurationMax = time.Duration(1 * time.Hour) trs, err = sr.FindTraces(context.Background(), params) assert.NoError(t, err) assert.Equal(t, 1, len(trs)) @@ -231,23 +234,36 @@ func TestIndexSeeks(t *testing.T) { trs, err = sr.FindTraces(context.Background(), params) assert.NoError(t, err) assert.Equal(t, 2, len(trs)) + assert.Equal(t, traceOrder[59], trs[0].Spans[0].TraceID.Low) + assert.Equal(t, traceOrder[55], trs[1].Spans[0].TraceID.Low) testOrder(trs) // Check for DESC return order with duration index - params.NumTraces = 9 - params.DurationMin = time.Duration(30 * time.Millisecond) // Filters one - params.DurationMax = time.Duration(50 * time.Millisecond) // Filters three + params = &spanstore.TraceQueryParameters{ + StartTimeMin: startT, + StartTimeMax: startT.Add(time.Duration(time.Hour * 1)), + DurationMin: time.Duration(30 * time.Millisecond), // Filters one + DurationMax: time.Duration(50 * time.Millisecond), // Filters three + NumTraces: 9, + } trs, err = sr.FindTraces(context.Background(), params) assert.NoError(t, err) - assert.Equal(t, 5, len(trs)) + assert.Equal(t, 9, len(trs)) // Returns 23, we limited to 9 + + // Check the newest items are returned + assert.Equal(t, traceOrder[50], trs[0].Spans[0].TraceID.Low) + assert.Equal(t, traceOrder[42], trs[8].Spans[0].TraceID.Low) testOrder(trs) - // Check for DESC return order without + // Check for DESC return order without duration index, but still with limit params.DurationMin = 0 params.DurationMax = 0 + params.NumTraces = 7 trs, err = sr.FindTraces(context.Background(), params) assert.NoError(t, err) - assert.Equal(t, 9, len(trs)) + assert.Equal(t, 7, len(trs)) + assert.Equal(t, traceOrder[59], trs[0].Spans[0].TraceID.Low) + assert.Equal(t, traceOrder[53], trs[6].Spans[0].TraceID.Low) testOrder(trs) // StartTime, endTime scan - full table scan (so technically no index seek) @@ -264,12 +280,14 @@ func TestIndexSeeks(t *testing.T) { // StartTime and Duration queries params.StartTimeMax = startT.Add(time.Duration(time.Hour * 10)) - params.DurationMin = time.Duration(53 * time.Millisecond) // trace 51 (max) - params.DurationMax = time.Duration(56 * time.Millisecond) // trace 56 (min) + params.DurationMin = time.Duration(53 * time.Millisecond) // trace 51 (min) + params.DurationMax = time.Duration(56 * time.Millisecond) // trace 56 (max) trs, err = sr.FindTraces(context.Background(), params) assert.NoError(t, err) assert.Equal(t, 6, len(trs)) + assert.Equal(t, traceOrder[56], trs[0].Spans[0].TraceID.Low) + assert.Equal(t, traceOrder[51], trs[5].Spans[0].TraceID.Low) testOrder(trs) }) } diff --git a/plugin/storage/badger/spanstore/reader.go b/plugin/storage/badger/spanstore/reader.go index 73a4146899f..1aef5434613 100644 --- a/plugin/storage/badger/spanstore/reader.go +++ b/plugin/storage/badger/spanstore/reader.go @@ -207,12 +207,17 @@ func (r *TraceReader) scanTimeRange(plan *executionPlan) ([]model.TraceID, error }) sort.Slice(traceKeys, func(k, h int) bool { + // This sorts by timestamp to descending order return bytes.Compare(traceKeys[k][sizeOfTraceID+1:sizeOfTraceID+1+8], traceKeys[h][sizeOfTraceID+1:sizeOfTraceID+1+8]) > 0 }) - traceIDs := make([]model.TraceID, len(traceKeys)) + sizeCount := len(traceKeys) + if plan.limit > 0 { + sizeCount = plan.limit + } + traceIDs := make([]model.TraceID, sizeCount) - for i := 0; i < len(traceKeys); i++ { + for i := 0; i < len(traceKeys) && i < sizeCount; i++ { traceIDs[i] = bytesToTraceID(traceKeys[i][1 : sizeOfTraceID+1]) } @@ -377,8 +382,8 @@ func (r *TraceReader) durationQueries(plan *executionPlan, query *spanstore.Trac durMax := uint64(model.DurationAsMicroseconds(query.DurationMax)) durMin := uint64(model.DurationAsMicroseconds(query.DurationMin)) - startKey := make([]byte, 9) - endKey := make([]byte, 9) + startKey := make([]byte, 1+8) + endKey := make([]byte, 1+8) startKey[0] = durationIndexKey endKey[0] = durationIndexKey diff --git a/plugin/storage/badger/spanstore/rw_internal_test.go b/plugin/storage/badger/spanstore/rw_internal_test.go index f1b861bc88d..3919a061448 100644 --- a/plugin/storage/badger/spanstore/rw_internal_test.go +++ b/plugin/storage/badger/spanstore/rw_internal_test.go @@ -119,7 +119,7 @@ func TestDuplicateTraceIDDetection(t *testing.T) { traces, err := rw.FindTraceIDs(context.Background(), &spanstore.TraceQueryParameters{ ServiceName: "service", - NumTraces: 128, + NumTraces: 256, // Default is 100, we want to fetch more than there should be StartTimeMax: time.Now().Add(time.Hour), StartTimeMin: testSpan.StartTime.Add(-1 * time.Hour), }) @@ -198,41 +198,3 @@ func TestMergeJoin(t *testing.T) { assert.Equal(2, len(merged)) assert.Equal(uint32(2), binary.BigEndian.Uint32(merged[1])) } - -/* -func TestIndexScanReturnOrder(t *testing.T) { - runWithBadger(t, func(store *badger.DB, t *testing.T) { - testSpan := createDummySpan() - cache := NewCacheStore(store, time.Duration(1*time.Hour), true) - sw := NewSpanWriter(store, cache, time.Duration(1*time.Hour), nil) - rw := NewTraceReader(store, cache) - - for i := 0; i < 1000; i++ { - testSpan.TraceID = model.TraceID{ - High: rand.Uint64(), - Low: uint64(rand.Int63()), - } - testSpan.SpanID = model.SpanID(rand.Uint64()) - testSpan.StartTime = testSpan.StartTime.Add(time.Duration(i) * time.Millisecond) - err := sw.WriteSpan(&testSpan) - assert.NoError(t, err) - } - - tqp := &spanstore.TraceQueryParameters{ - ServiceName: "service", - StartTimeMax: testSpan.StartTime.Add(time.Hour), - StartTimeMin: testSpan.StartTime.Add(-1 * time.Hour), - } - - indexSeeks := make([][]byte, 0, 1) - indexSeeks = serviceQueries(tqp, indexSeeks) - - ids := make([][][]byte, 0, len(indexSeeks)+1) - - indexResults, _ := rw.indexSeeksToTraceIDs(tqp, indexSeeks, ids) - assert.True(t, sort.SliceIsSorted(indexResults[0], func(i, j int) bool { - return bytes.Compare(indexResults[0][i], indexResults[0][j]) < 0 - })) - }) -} -*/ From 66d9333b5c9e67fb2b275195ad1f543360449b74 Mon Sep 17 00:00:00 2001 From: Michael Burman Date: Mon, 25 Nov 2019 11:41:23 +0200 Subject: [PATCH 3/3] Reduce allocation of []TraceID in scanTimeRange to limit or length of returned values, whichever is smallest Signed-off-by: Michael Burman --- plugin/storage/badger/spanstore/reader.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/plugin/storage/badger/spanstore/reader.go b/plugin/storage/badger/spanstore/reader.go index 1aef5434613..cf3507b1452 100644 --- a/plugin/storage/badger/spanstore/reader.go +++ b/plugin/storage/badger/spanstore/reader.go @@ -212,12 +212,12 @@ func (r *TraceReader) scanTimeRange(plan *executionPlan) ([]model.TraceID, error }) sizeCount := len(traceKeys) - if plan.limit > 0 { + if plan.limit > 0 && plan.limit < sizeCount { sizeCount = plan.limit } traceIDs := make([]model.TraceID, sizeCount) - for i := 0; i < len(traceKeys) && i < sizeCount; i++ { + for i := 0; i < sizeCount; i++ { traceIDs[i] = bytesToTraceID(traceKeys[i][1 : sizeOfTraceID+1]) }