diff --git a/pkg/es/client.go b/pkg/es/client.go index f7a5b6e73a0..b904be44d9e 100644 --- a/pkg/es/client.go +++ b/pkg/es/client.go @@ -61,6 +61,7 @@ type SearchService interface { Aggregation(name string, aggregation elastic.Aggregation) SearchService IgnoreUnavailable(ignoreUnavailable bool) SearchService Query(query elastic.Query) SearchService + FetchSourceContext(fetchSourceContext *elastic.FetchSourceContext) SearchService Do(ctx context.Context) (*elastic.SearchResult, error) } diff --git a/pkg/es/mocks/SearchService.go b/pkg/es/mocks/SearchService.go index cf01f949330..4f906a366ed 100644 --- a/pkg/es/mocks/SearchService.go +++ b/pkg/es/mocks/SearchService.go @@ -71,6 +71,26 @@ func (_m *SearchService) Do(ctx context.Context) (*elastic.SearchResult, error) return r0, r1 } +// FetchSourceContext provides a mock function with given fields: fetchSourceContext +func (_m *SearchService) FetchSourceContext(fetchSourceContext *elastic.FetchSourceContext) es.SearchService { + ret := _m.Called(fetchSourceContext) + + if len(ret) == 0 { + panic("no return value specified for FetchSourceContext") + } + + var r0 es.SearchService + if rf, ok := ret.Get(0).(func(*elastic.FetchSourceContext) es.SearchService); ok { + r0 = rf(fetchSourceContext) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(es.SearchService) + } + } + + return r0 +} + // IgnoreUnavailable provides a mock function with given fields: ignoreUnavailable func (_m *SearchService) IgnoreUnavailable(ignoreUnavailable bool) es.SearchService { ret := _m.Called(ignoreUnavailable) diff --git a/pkg/es/wrapper/wrapper.go b/pkg/es/wrapper/wrapper.go index e34b8c49590..d12e464239e 100644 --- a/pkg/es/wrapper/wrapper.go +++ b/pkg/es/wrapper/wrapper.go @@ -270,6 +270,10 @@ func (s SearchServiceWrapper) Do(ctx context.Context) (*elastic.SearchResult, er return s.searchService.Do(ctx) } +func (s SearchServiceWrapper) FetchSourceContext(fetchSourceContext *elastic.FetchSourceContext) es.SearchService { + return WrapESSearchService(s.searchService.FetchSourceContext(fetchSourceContext)) +} + // MultiSearchServiceWrapper is a wrapper around elastic.ESMultiSearchService type MultiSearchServiceWrapper struct { multiSearchService *elastic.MultiSearchService diff --git a/plugin/storage/es/spanstore/internal/dbmodel/model.go b/plugin/storage/es/spanstore/internal/dbmodel/model.go index 4308fd1330d..ef6a66db291 100644 --- a/plugin/storage/es/spanstore/internal/dbmodel/model.go +++ b/plugin/storage/es/spanstore/internal/dbmodel/model.go @@ -83,14 +83,9 @@ type KeyValue struct { Value any `json:"value"` } -// Service is the JSON struct for service:operation documents in ElasticSearch +// Service is the JSON struct for service:kind:operation documents in ElasticSearch type Service struct { ServiceName string `json:"serviceName"` + Kind string `json:"spanKind,omitempty"` OperationName string `json:"operationName"` } - -// ServiceWithKind is the JSON struct service:kind:operation documents in ElasticSearch -type ServiceWithKind struct { - Service - Kind string `json:"spanKind"` -} diff --git a/plugin/storage/es/spanstore/reader_test.go b/plugin/storage/es/spanstore/reader_test.go index fcc85f59d89..7c654409969 100644 --- a/plugin/storage/es/spanstore/reader_test.go +++ b/plugin/storage/es/spanstore/reader_test.go @@ -645,27 +645,7 @@ func testGetWithKind(typ string, t *testing.T, testKind bool) { goodAggregations[typ] = (*json.RawMessage)(&rawMessage) var filterRawMessage json.RawMessage if typ == operationsAggregation { - if !testKind { - filterRawMessage = json.RawMessage(` - { - "buckets": { - "distinct_operations_without_kind": { - "doc_count": 1, - "operationName": { - "doc_count_error_upper_bound": 0, - "sum_other_doc_count": 0, - "buckets": [ - { - "key": "123", - "doc_count": 16 - } - ] - } - } - } - } -`) - } else { + if testKind { filterRawMessage = rawMessage } goodAggregations[typ] = &filterRawMessage @@ -687,14 +667,19 @@ func testGetWithKind(typ string, t *testing.T, testKind bool) { return "search services failed: Search failure" }, }, - { + } + + if (typ == operationsAggregation && testKind) || (typ != operationsAggregation) { + testCase := testGetStruct{ caption: typ + " search error", searchResult: &elastic.SearchResult{Aggregations: elastic.Aggregations(badAggregations)}, expectedError: func() string { return "could not find aggregation of " + typ }, - }, + } + testCases = append(testCases, testCase) } + if testKind { testCases = append(testCases, testGetStruct{ caption: typ + " full behavior with kind", @@ -707,7 +692,38 @@ func testGetWithKind(typ string, t *testing.T, testKind bool) { return "" }, }) - } else { + } + + if typ == operationsAggregation && !testKind { + score := 0.6931471 + msg := json.RawMessage(`{"operationName": "123"}`) + hitModel := &elastic.SearchHits{ + TotalHits: 1, + MaxScore: &score, + Hits: []*elastic.SearchHit{ + { + Score: &score, + SeqNo: nil, + Id: "e232b0fbe5cebc85", + PrimaryTerm: nil, + Source: &msg, + }, + }, + } + testCases = append(testCases, testGetStruct{ + caption: typ + " full behavior", + searchResult: &elastic.SearchResult{Hits: hitModel}, + expectedOutput: map[string]any{ + operationsAggregation: []spanstore.Operation{{Name: "123"}}, + "default": []string{"123"}, + }, + expectedError: func() string { + return "" + }, + }) + } + + if typ != operationsAggregation { testCases = append(testCases, testGetStruct{ caption: typ + " full behavior", searchResult: &elastic.SearchResult{Aggregations: goodAggregations}, @@ -1076,15 +1092,16 @@ func mockSearchService(r *spanReaderTest) *mock.Call { func mockSearchServiceWithSpanKind(r *spanReaderTest, inputHasSpanKind bool) *mock.Call { searchService := &mocks.SearchService{} searchService.On("Query", mock.Anything).Return(searchService) - searchService.On("IgnoreUnavailable", mock.AnythingOfType("bool")).Return(searchService) searchService.On("Size", mock.MatchedBy(func(size int) bool { return size == 0 // Aggregations apply size (bucket) limits in their own query objects, and do not apply at the parent query level. })).Return(searchService) + searchService.On("IgnoreUnavailable", mock.AnythingOfType("bool")).Return(searchService) searchService.On("Aggregation", stringMatcher(servicesAggregation), mock.MatchedBy(matchTermsAggregation)).Return(searchService) if inputHasSpanKind { searchService.On("Aggregation", stringMatcher(operationsAggregation), mock.MatchedBy(matchTermsAggregation)).Return(searchService) } else { - searchService.On("Aggregation", stringMatcher(operationsAggregation), mock.AnythingOfType("*elastic.FiltersAggregation")).Return(searchService) + searchService.On("FetchSourceContext", mock.AnythingOfType("*elastic.FetchSourceContext"), mock.AnythingOfType("string"), mock.AnythingOfType("string")).Return(searchService) + searchService.On("Size", mock.AnythingOfType("int")).Return(searchService) } searchService.On("Aggregation", stringMatcher(traceIDAggregation), mock.AnythingOfType("*elastic.TermsAggregation")).Return(searchService) r.client.On("Search", mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("string")).Return(searchService) diff --git a/plugin/storage/es/spanstore/service_operation.go b/plugin/storage/es/spanstore/service_operation.go index ec20fcac771..a4b168fc141 100644 --- a/plugin/storage/es/spanstore/service_operation.go +++ b/plugin/storage/es/spanstore/service_operation.go @@ -27,7 +27,6 @@ const ( serviceName = "serviceName" spanKind = "spanKind" operationsAggregation = "distinct_operations" - operationsWithoutKind = "distinct_operations_without_kind" servicesAggregation = "distinct_services" ) @@ -58,27 +57,16 @@ func NewServiceOperationStorage( // Write saves a service to operation pair. func (s *ServiceOperationStorage) Write(indexName string, jsonSpan *dbmodel.Span, kind model.SpanKind) { - // Insert serviceName:operationName document + // Insert serviceName:kind:operationName document service := dbmodel.Service{ ServiceName: jsonSpan.Process.ServiceName, OperationName: jsonSpan.OperationName, + Kind: string(kind), } - if kind != model.SpanKindUnspecified { - serviceWithKind := dbmodel.ServiceWithKind{ - Service: service, - Kind: string(kind), - } - cacheKey := hashCodeWithKind(serviceWithKind) - if !keyInCache(cacheKey, s.serviceCache) { - s.client().Index().Index(indexName).Type(serviceType).Id(cacheKey).BodyJson(serviceWithKind).Add() - writeCache(cacheKey, s.serviceCache) - } - } else { - cacheKey := hashCode(service) - if !keyInCache(cacheKey, s.serviceCache) { - s.client().Index().Index(indexName).Type(serviceType).Id(cacheKey).BodyJson(service).Add() - writeCache(cacheKey, s.serviceCache) - } + cacheKey := hashCode(service) + if !keyInCache(cacheKey, s.serviceCache) { + s.client().Index().Index(indexName).Type(serviceType).Id(cacheKey).BodyJson(service).Add() + writeCache(cacheKey, s.serviceCache) } } @@ -137,32 +125,20 @@ func (s *ServiceOperationStorage) getOperations(ctx context.Context, indices []s operationNamesBucket := bucket.Buckets return bucketOfOperationNamesToOperationsArray(operationNamesBucket, kind) } - serviceFilter := elastic.NewFiltersAggregation(). - FilterWithName(string(model.SpanKindClient), elastic.NewTermQuery(spanKind, string(model.SpanKindClient))). - FilterWithName(string(model.SpanKindServer), elastic.NewTermQuery(spanKind, string(model.SpanKindServer))). - FilterWithName(string(model.SpanKindProducer), elastic.NewTermQuery(spanKind, string(model.SpanKindProducer))). - FilterWithName(string(model.SpanKindConsumer), elastic.NewTermQuery(spanKind, string(model.SpanKindConsumer))). - FilterWithName(string(model.SpanKindInternal), elastic.NewTermQuery(spanKind, string(model.SpanKindInternal))). - FilterWithName(operationsWithoutKind, elastic.NewBoolQuery().MustNot(elastic.NewExistsQuery(spanKind))). - SubAggregation(operationNameField, elastic.NewTermsAggregation().Field(operationNameField).Size(maxDocCount)) serviceQuery := elastic.NewTermQuery(serviceName, service) searchService = s.client().Search(indices...). - Size(0). Query(serviceQuery). IgnoreUnavailable(true). - Aggregation(operationsAggregation, serviceFilter) + FetchSourceContext(elastic.NewFetchSourceContext(true).Include(spanKind, operationNameField)). + Size(maxDocCount) searchResult, err := searchService.Do(ctx) if err != nil { return nil, fmt.Errorf("search operations failed: %w", es.DetailedError(err)) } - if searchResult.Aggregations == nil { + if searchResult.Hits == nil { return []spanstore.Operation{}, nil } - bucket, found := searchResult.Aggregations.Filters(operationsAggregation) - if !found { - return nil, errors.New("could not find aggregation of " + operationsAggregation) - } - return bucketOfOperationsToOperationsArray(bucket) + return bucketOfOperationsToOperationsArray(searchResult.Hits) } func getOperationsAggregation(maxDocCount int) elastic.Query { @@ -186,57 +162,28 @@ func bucketOfOperationNamesToOperationsArray(buckets []*elastic.AggregationBucke return result, nil } -func bucketOfOperationsToOperationsArray(searchResult *elastic.AggregationBucketFilters) ([]spanstore.Operation, error) { - var result []spanstore.Operation - for name, bucket := range searchResult.NamedBuckets { - if kind, err := model.SpanKindFromString(name); err == nil { - if v, ok := bucket.Aggregations[operationNameField]; ok { - result, err = addOperationsFromRawData(v, string(kind), result) - if err != nil { - return nil, err - } - } - } else { - if name == operationsWithoutKind { - if v, ok := bucket.Aggregations[operationNameField]; ok { - result, err = addOperationsFromRawData(v, "", result) - if err != nil { - return nil, err - } - } - } +func bucketOfOperationsToOperationsArray(searchResult *elastic.SearchHits) ([]spanstore.Operation, error) { + result := make([]spanstore.Operation, len(searchResult.Hits)) + for i, hit := range searchResult.Hits { + data := hit.Source + op, err := rawMessageToOperation(data) + if err != nil { + return nil, err } + result[i] = op } return result, nil } -func addOperationsFromRawData(raw *json.RawMessage, kind string, result []spanstore.Operation) ([]spanstore.Operation, error) { - var items *elastic.AggregationBucketKeyItems - err := json.Unmarshal(*raw, &items) - if err != nil { - return nil, err +func rawMessageToOperation(data *json.RawMessage) (spanstore.Operation, error) { + var service dbmodel.Service + if err := json.Unmarshal(*data, &service); err != nil { + return spanstore.Operation{}, err } - for _, item := range items.Buckets { - str, ok := item.Key.(string) - if !ok { - return nil, errors.New("non-string key found in aggregation") - } - result = append(result, spanstore.Operation{ - Name: str, - SpanKind: kind, - }) - } - return result, nil + return spanstore.Operation{Name: service.OperationName, SpanKind: service.Kind}, nil } func hashCode(s dbmodel.Service) string { - h := fnv.New64a() - h.Write([]byte(s.ServiceName)) - h.Write([]byte(s.OperationName)) - return strconv.FormatUint(h.Sum64(), 16) -} - -func hashCodeWithKind(s dbmodel.ServiceWithKind) string { h := fnv.New64a() h.Write([]byte(s.ServiceName)) h.Write([]byte(s.Kind)) diff --git a/plugin/storage/es/spanstore/service_operation_test.go b/plugin/storage/es/spanstore/service_operation_test.go index 20a5cf380ec..17dbb3b7aa8 100644 --- a/plugin/storage/es/spanstore/service_operation_test.go +++ b/plugin/storage/es/spanstore/service_operation_test.go @@ -64,7 +64,7 @@ func TestWriteServiceWithKind(t *testing.T) { indexService.On("Index", stringMatcher(indexName)).Return(indexService) indexService.On("Type", stringMatcher(serviceType)).Return(indexService) indexService.On("Id", stringMatcher(serviceHash)).Return(indexService) - indexService.On("BodyJson", mock.AnythingOfType("dbmodel.ServiceWithKind")).Return(indexService) + indexService.On("BodyJson", mock.AnythingOfType("dbmodel.Service")).Return(indexService) indexService.On("Add") w.client.On("Index").Return(indexService)