Skip to content

Commit

Permalink
Fix archive storage not querying old spans older than maxSpanAge (#1617)
Browse files Browse the repository at this point in the history
* Fix archive reader

Signed-off-by: Pavol Loffay <[email protected]>

* Add itest

Signed-off-by: Pavol Loffay <[email protected]>

* Use constant

Signed-off-by: Pavol Loffay <[email protected]>

* Fmt

Signed-off-by: Pavol Loffay <[email protected]>

* Move sort to not archive search

Signed-off-by: Pavol Loffay <[email protected]>

* fmt

Signed-off-by: Pavol Loffay <[email protected]>
  • Loading branch information
pavolloffay authored Jun 21, 2019
1 parent ebbd409 commit 4f5e04b
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 23 deletions.
31 changes: 22 additions & 9 deletions plugin/storage/es/spanstore/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,12 @@ type SpanReader struct {
// The age of the oldest service/operation we will look for. Because indices in ElasticSearch are by day,
// this will be rounded down to UTC 00:00 of that day.
maxSpanAge time.Duration
maxNumSpans int
serviceOperationStorage *ServiceOperationStorage
spanIndexPrefix string
serviceIndexPrefix string
spanConverter dbmodel.ToDomain
timeRangeIndices timeRangeIndexFn
sourceFn sourceFn
}

// SpanReaderParams holds constructor params for NewSpanReader
Expand All @@ -123,17 +123,19 @@ func NewSpanReader(p SpanReaderParams) *SpanReader {
client: p.Client,
logger: p.Logger,
maxSpanAge: p.MaxSpanAge,
maxNumSpans: p.MaxNumSpans,
serviceOperationStorage: NewServiceOperationStorage(ctx, p.Client, p.Logger, 0), // the decorator takes care of metrics
spanIndexPrefix: indexNames(p.IndexPrefix, spanIndex),
serviceIndexPrefix: indexNames(p.IndexPrefix, serviceIndex),
spanConverter: dbmodel.NewToDomain(p.TagDotReplacement),
timeRangeIndices: getTimeRangeIndexFn(p.Archive, p.UseReadWriteAliases),
sourceFn: getSourceFn(p.Archive, p.MaxNumSpans),
}
}

type timeRangeIndexFn func(indexName string, startTime time.Time, endTime time.Time) []string

type sourceFn func(query elastic.Query, nextTime uint64) *elastic.SearchSource

func getTimeRangeIndexFn(archive, useReadWriteAliases bool) timeRangeIndexFn {
if archive {
var archivePrefix string
Expand All @@ -154,6 +156,20 @@ func getTimeRangeIndexFn(archive, useReadWriteAliases bool) timeRangeIndexFn {
return timeRangeIndices
}

func getSourceFn(archive bool, maxNumSpans int) sourceFn {
return func(query elastic.Query, nextTime uint64) *elastic.SearchSource {
s := elastic.NewSearchSource().
Query(query).
Size(defaultDocCount).
TerminateAfter(maxNumSpans)
if !archive {
s.Sort("startTime", true).
SearchAfter(nextTime)
}
return s
}
}

// timeRangeIndices returns the array of indices that we need to query, based on query params
func timeRangeIndices(indexName string, startTime time.Time, endTime time.Time) []string {
var indices []string
Expand Down Expand Up @@ -311,16 +327,13 @@ func (s *SpanReader) multiRead(ctx context.Context, traceIDs []model.TraceID, st
if val, ok := searchAfterTime[traceID]; ok {
nextTime = val
}

s := s.sourceFn(query, nextTime)

searchRequests[i] = elastic.NewSearchRequest().
IgnoreUnavailable(true).
Type(spanType).
Source(
elastic.NewSearchSource().
Query(query).
Size(defaultDocCount).
TerminateAfter(s.maxNumSpans).
Sort("startTime", true).
SearchAfter(nextTime))
Source(s)
}
// set traceIDs to empty
traceIDs = nil
Expand Down
68 changes: 54 additions & 14 deletions plugin/storage/integration/elasticsearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/pkg/errors"
"github.com/stretchr/testify/require"
"github.com/uber/jaeger-lib/metrics"
"go.uber.org/zap"
"gopkg.in/olivere/elastic.v5"

"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/es/wrapper"
"github.com/jaegertracing/jaeger/pkg/testutils"
"github.com/jaegertracing/jaeger/plugin/storage/es"
Expand All @@ -43,6 +45,7 @@ const (
password = "changeme" // the elasticsearch default password
indexPrefix = "integration-test"
tagKeyDeDotChar = "@"
maxSpanAge = time.Hour * 72
)

type ESStorageIntegration struct {
Expand All @@ -53,7 +56,7 @@ type ESStorageIntegration struct {
logger *zap.Logger
}

func (s *ESStorageIntegration) initializeES(allTagsAsFields bool) error {
func (s *ESStorageIntegration) initializeES(allTagsAsFields, archive bool) error {
rawClient, err := elastic.NewClient(
elastic.SetURL(queryURL),
elastic.SetBasicAuth(username, password),
Expand All @@ -70,22 +73,22 @@ func (s *ESStorageIntegration) initializeES(allTagsAsFields bool) error {
dependencyStore := dependencystore.NewDependencyStore(client, s.logger, indexPrefix)
s.DependencyReader = dependencyStore
s.DependencyWriter = dependencyStore
s.initSpanstore(allTagsAsFields)
s.initSpanstore(allTagsAsFields, archive)
s.CleanUp = func() error {
return s.esCleanUp(allTagsAsFields)
return s.esCleanUp(allTagsAsFields, archive)
}
s.Refresh = s.esRefresh
s.esCleanUp(allTagsAsFields)
s.esCleanUp(allTagsAsFields, archive)
return nil
}

func (s *ESStorageIntegration) esCleanUp(allTagsAsFields bool) error {
func (s *ESStorageIntegration) esCleanUp(allTagsAsFields, archive bool) error {
_, err := s.client.DeleteIndex("*").Do(context.Background())
s.initSpanstore(allTagsAsFields)
s.initSpanstore(allTagsAsFields, archive)
return err
}

func (s *ESStorageIntegration) initSpanstore(allTagsAsFields bool) {
func (s *ESStorageIntegration) initSpanstore(allTagsAsFields, archive bool) {
bp, _ := s.client.BulkProcessor().BulkActions(1).FlushInterval(time.Nanosecond).Do(context.Background())
client := eswrapper.WrapESClient(s.client, bp)
spanMapping, serviceMapping := es.GetMappings(5, 1)
Expand All @@ -99,14 +102,16 @@ func (s *ESStorageIntegration) initSpanstore(allTagsAsFields bool) {
TagDotReplacement: tagKeyDeDotChar,
SpanMapping: spanMapping,
ServiceMapping: serviceMapping,
Archive: archive,
})
s.SpanReader = spanstore.NewSpanReader(spanstore.SpanReaderParams{
Client: client,
Logger: s.logger,
MetricsFactory: metrics.NullFactory,
IndexPrefix: indexPrefix,
MaxSpanAge: 72 * time.Hour,
MaxSpanAge: maxSpanAge,
TagDotReplacement: tagKeyDeDotChar,
Archive: archive,
})
}

Expand All @@ -129,22 +134,57 @@ func healthCheck() error {
return errors.New("elastic search is not ready")
}

func testElasticsearchStorage(t *testing.T, allTagsAsFields bool) {
func testElasticsearchStorage(t *testing.T, allTagsAsFields, archive bool) {
if os.Getenv("STORAGE") != "elasticsearch" {
t.Skip("Integration test against ElasticSearch skipped; set STORAGE env var to elasticsearch to run this")
}
if err := healthCheck(); err != nil {
t.Fatal(err)
}
s := &ESStorageIntegration{}
require.NoError(t, s.initializeES(allTagsAsFields))
s.IntegrationTestAll(t)
require.NoError(t, s.initializeES(allTagsAsFields, archive))

if archive {
t.Run("ArchiveTrace", s.testArchiveTrace)
} else {
s.IntegrationTestAll(t)
}
}

func TestElasticsearchStorage(t *testing.T) {
testElasticsearchStorage(t, false)
testElasticsearchStorage(t, false, false)
}

func TestElasticsearchStorage_AllTagsAsObjectFields(t *testing.T) {
testElasticsearchStorage(t, true, false)
}

func TestElasticsearchStorage_Archive(t *testing.T) {
testElasticsearchStorage(t, false, true)
}

func TestElasticsearchStorageAllTagsAsObjectFields(t *testing.T) {
testElasticsearchStorage(t, true)
func (s *StorageIntegration) testArchiveTrace(t *testing.T) {
defer s.cleanUp(t)
tId := model.NewTraceID(uint64(11), uint64(22))
expected := &model.Span{
OperationName: "archive_span",
StartTime: time.Now().Add(-maxSpanAge*5),
TraceID: tId,
SpanID: model.NewSpanID(55),
References: []model.SpanRef{},
Process: model.NewProcess("archived_service", model.KeyValues{}),
}

require.NoError(t, s.SpanWriter.WriteSpan(expected))
s.refresh(t)

var actual *model.Trace
found := s.waitForCondition(t, func(t *testing.T) bool {
var err error
actual, err = s.SpanReader.GetTrace(context.Background(), tId)
return err == nil && len(actual.Spans) == 1
})
if !assert.True(t, found) {
CompareTraces(t, &model.Trace{Spans: []*model.Span{expected}}, actual)
}
}

0 comments on commit 4f5e04b

Please sign in to comment.