diff --git a/plugin/storage/es/spanstore/reader.go b/plugin/storage/es/spanstore/reader.go index 494c445383f..8eabf3dfc49 100644 --- a/plugin/storage/es/spanstore/reader.go +++ b/plugin/storage/es/spanstore/reader.go @@ -95,12 +95,12 @@ type SpanReader struct { // The age of the oldest service/operation we will look for. Because indices in ElasticSearch are by day, // this will be rounded down to UTC 00:00 of that day. maxSpanAge time.Duration - maxNumSpans int serviceOperationStorage *ServiceOperationStorage spanIndexPrefix []string serviceIndexPrefix []string spanConverter dbmodel.ToDomain timeRangeIndices timeRangeIndexFn + sourceFn sourceFn } // SpanReaderParams holds constructor params for NewSpanReader @@ -124,17 +124,19 @@ func NewSpanReader(p SpanReaderParams) *SpanReader { client: p.Client, logger: p.Logger, maxSpanAge: p.MaxSpanAge, - maxNumSpans: p.MaxNumSpans, serviceOperationStorage: NewServiceOperationStorage(ctx, p.Client, p.Logger, 0), // the decorator takes care of metrics spanIndexPrefix: indexNames(p.IndexPrefix, spanIndex), serviceIndexPrefix: indexNames(p.IndexPrefix, serviceIndex), spanConverter: dbmodel.NewToDomain(p.TagDotReplacement), timeRangeIndices: getTimeRangeIndexFn(p.Archive, p.UseReadWriteAliases), + sourceFn: getSourceFn(p.Archive, p.MaxNumSpans), } } type timeRangeIndexFn func(indexName []string, startTime time.Time, endTime time.Time) []string +type sourceFn func(query elastic.Query, nextTime uint64) *elastic.SearchSource + func getTimeRangeIndexFn(archive, useReadWriteAliases bool) timeRangeIndexFn { if archive { var archivePrefix string @@ -159,6 +161,20 @@ func getTimeRangeIndexFn(archive, useReadWriteAliases bool) timeRangeIndexFn { return timeRangeIndices } +func getSourceFn(archive bool, maxNumSpans int) sourceFn { + return func(query elastic.Query, nextTime uint64) *elastic.SearchSource { + s := elastic.NewSearchSource(). + Query(query). + Size(defaultDocCount). + TerminateAfter(maxNumSpans) + if !archive { + s.Sort("startTime", true). + SearchAfter(nextTime) + } + return s + } +} + // timeRangeIndices returns the array of indices that we need to query, based on query params func timeRangeIndices(indexNames []string, startTime time.Time, endTime time.Time) []string { var indices []string @@ -318,16 +334,13 @@ func (s *SpanReader) multiRead(ctx context.Context, traceIDs []model.TraceID, st if val, ok := searchAfterTime[traceID]; ok { nextTime = val } + + s := s.sourceFn(query, nextTime) + searchRequests[i] = elastic.NewSearchRequest(). IgnoreUnavailable(true). Type(spanType). - Source( - elastic.NewSearchSource(). - Query(query). - Size(defaultDocCount). - TerminateAfter(s.maxNumSpans). - Sort("startTime", true). - SearchAfter(nextTime)) + Source(s) } // set traceIDs to empty traceIDs = nil diff --git a/plugin/storage/integration/elasticsearch_test.go b/plugin/storage/integration/elasticsearch_test.go index 8b14cf4f4a5..619a67ff507 100644 --- a/plugin/storage/integration/elasticsearch_test.go +++ b/plugin/storage/integration/elasticsearch_test.go @@ -21,12 +21,14 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" "github.com/pkg/errors" "github.com/stretchr/testify/require" "github.com/uber/jaeger-lib/metrics" "go.uber.org/zap" "gopkg.in/olivere/elastic.v5" + "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/pkg/es/wrapper" "github.com/jaegertracing/jaeger/pkg/testutils" "github.com/jaegertracing/jaeger/plugin/storage/es" @@ -43,6 +45,7 @@ const ( password = "changeme" // the elasticsearch default password indexPrefix = "integration-test" tagKeyDeDotChar = "@" + maxSpanAge = time.Hour * 72 ) type ESStorageIntegration struct { @@ -53,7 +56,7 @@ type ESStorageIntegration struct { logger *zap.Logger } -func (s *ESStorageIntegration) initializeES(allTagsAsFields bool) error { +func (s *ESStorageIntegration) initializeES(allTagsAsFields, archive bool) error { rawClient, err := elastic.NewClient( elastic.SetURL(queryURL), elastic.SetBasicAuth(username, password), @@ -70,22 +73,22 @@ func (s *ESStorageIntegration) initializeES(allTagsAsFields bool) error { dependencyStore := dependencystore.NewDependencyStore(client, s.logger, indexPrefix) s.DependencyReader = dependencyStore s.DependencyWriter = dependencyStore - s.initSpanstore(allTagsAsFields) + s.initSpanstore(allTagsAsFields, archive) s.CleanUp = func() error { - return s.esCleanUp(allTagsAsFields) + return s.esCleanUp(allTagsAsFields, archive) } s.Refresh = s.esRefresh - s.esCleanUp(allTagsAsFields) + s.esCleanUp(allTagsAsFields, archive) return nil } -func (s *ESStorageIntegration) esCleanUp(allTagsAsFields bool) error { +func (s *ESStorageIntegration) esCleanUp(allTagsAsFields, archive bool) error { _, err := s.client.DeleteIndex("*").Do(context.Background()) - s.initSpanstore(allTagsAsFields) + s.initSpanstore(allTagsAsFields, archive) return err } -func (s *ESStorageIntegration) initSpanstore(allTagsAsFields bool) { +func (s *ESStorageIntegration) initSpanstore(allTagsAsFields, archive bool) { bp, _ := s.client.BulkProcessor().BulkActions(1).FlushInterval(time.Nanosecond).Do(context.Background()) client := eswrapper.WrapESClient(s.client, bp) spanMapping, serviceMapping := es.GetMappings(5, 1) @@ -99,14 +102,16 @@ func (s *ESStorageIntegration) initSpanstore(allTagsAsFields bool) { TagDotReplacement: tagKeyDeDotChar, SpanMapping: spanMapping, ServiceMapping: serviceMapping, + Archive: archive, }) s.SpanReader = spanstore.NewSpanReader(spanstore.SpanReaderParams{ Client: client, Logger: s.logger, MetricsFactory: metrics.NullFactory, IndexPrefix: indexPrefix, - MaxSpanAge: 72 * time.Hour, + MaxSpanAge: maxSpanAge, TagDotReplacement: tagKeyDeDotChar, + Archive: archive, }) } @@ -129,7 +134,7 @@ func healthCheck() error { return errors.New("elastic search is not ready") } -func testElasticsearchStorage(t *testing.T, allTagsAsFields bool) { +func testElasticsearchStorage(t *testing.T, allTagsAsFields, archive bool) { if os.Getenv("STORAGE") != "elasticsearch" { t.Skip("Integration test against ElasticSearch skipped; set STORAGE env var to elasticsearch to run this") } @@ -137,14 +142,49 @@ func testElasticsearchStorage(t *testing.T, allTagsAsFields bool) { t.Fatal(err) } s := &ESStorageIntegration{} - require.NoError(t, s.initializeES(allTagsAsFields)) - s.IntegrationTestAll(t) + require.NoError(t, s.initializeES(allTagsAsFields, archive)) + + if archive { + t.Run("ArchiveTrace", s.testArchiveTrace) + } else { + s.IntegrationTestAll(t) + } } func TestElasticsearchStorage(t *testing.T) { - testElasticsearchStorage(t, false) + testElasticsearchStorage(t, false, false) +} + +func TestElasticsearchStorage_AllTagsAsObjectFields(t *testing.T) { + testElasticsearchStorage(t, true, false) +} + +func TestElasticsearchStorage_Archive(t *testing.T) { + testElasticsearchStorage(t, false, true) } -func TestElasticsearchStorageAllTagsAsObjectFields(t *testing.T) { - testElasticsearchStorage(t, true) +func (s *StorageIntegration) testArchiveTrace(t *testing.T) { + defer s.cleanUp(t) + tId := model.NewTraceID(uint64(11), uint64(22)) + expected := &model.Span{ + OperationName: "archive_span", + StartTime: time.Now().Add(-maxSpanAge*5), + TraceID: tId, + SpanID: model.NewSpanID(55), + References: []model.SpanRef{}, + Process: model.NewProcess("archived_service", model.KeyValues{}), + } + + require.NoError(t, s.SpanWriter.WriteSpan(expected)) + s.refresh(t) + + var actual *model.Trace + found := s.waitForCondition(t, func(t *testing.T) bool { + var err error + actual, err = s.SpanReader.GetTrace(context.Background(), tId) + return err == nil && len(actual.Spans) == 1 + }) + if !assert.True(t, found) { + CompareTraces(t, &model.Trace{Spans: []*model.Span{expected}}, actual) + } }