Skip to content

Commit

Permalink
Query simplified
Browse files Browse the repository at this point in the history
Signed-off-by: Manik2708 <[email protected]>
  • Loading branch information
Manik2708 committed Jan 5, 2025
1 parent e3a883e commit 03a7004
Show file tree
Hide file tree
Showing 12 changed files with 262 additions and 71 deletions.
5 changes: 2 additions & 3 deletions cmd/jaeger/internal/integration/elasticsearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,8 @@ func TestElasticsearchStorage(t *testing.T) {
s := &E2EStorageIntegration{
ConfigFile: "../../config-elasticsearch.yaml",
StorageIntegration: integration.StorageIntegration{
CleanUp: purge,
Fixtures: integration.LoadAndParseQueryTestCases(t, "fixtures/queries_es.json"),
GetOperationsMissingSpanKind: true,
CleanUp: purge,
Fixtures: integration.LoadAndParseQueryTestCases(t, "fixtures/queries_es.json"),
},
}
s.e2eInitialize(t, "elasticsearch")
Expand Down
1 change: 1 addition & 0 deletions pkg/es/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type SearchService interface {
Size(size int) SearchService
Aggregation(name string, aggregation elastic.Aggregation) SearchService
IgnoreUnavailable(ignoreUnavailable bool) SearchService
FetchSourceContext(fetchSourceContext *elastic.FetchSourceContext) SearchService
Query(query elastic.Query) SearchService
Do(ctx context.Context) (*elastic.SearchResult, error)
}
Expand Down
20 changes: 20 additions & 0 deletions pkg/es/mocks/SearchService.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions pkg/es/wrapper/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,10 @@ func (s SearchServiceWrapper) Query(query elastic.Query) es.SearchService {
return WrapESSearchService(s.searchService.Query(query))
}

func (s SearchServiceWrapper) FetchSourceContext(fetchSourceContext *elastic.FetchSourceContext) es.SearchService {
return WrapESSearchService(s.searchService.FetchSourceContext(fetchSourceContext))
}

// Do calls this function to internal service.
func (s SearchServiceWrapper) Do(ctx context.Context) (*elastic.SearchResult, error) {
return s.searchService.Do(ctx)
Expand Down
1 change: 1 addition & 0 deletions plugin/storage/es/spanstore/internal/dbmodel/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,5 +86,6 @@ type KeyValue struct {
// Service is the JSON struct for service:operation documents in ElasticSearch
type Service struct {
ServiceName string `json:"serviceName"`
Kind string `json:"spanKind,omitempty"`
OperationName string `json:"operationName"`
}
13 changes: 2 additions & 11 deletions plugin/storage/es/spanstore/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,20 +306,11 @@ func (s *SpanReader) GetOperations(
currentTime,
cfg.RolloverFrequencyAsNegativeDuration(s.serviceIndex.RolloverFrequency),
)
operations, err := s.serviceOperationStorage.getOperations(ctx, jaegerIndices, query.ServiceName, s.maxDocCount)
operations, err := s.serviceOperationStorage.getOperations(ctx, jaegerIndices, query.ServiceName, query.SpanKind, s.maxDocCount)
if err != nil {
return nil, err
}

// TODO: https://github.com/jaegertracing/jaeger/issues/1923
// - return the operations with actual span kind that meet requirement
var result []spanstore.Operation
for _, operation := range operations {
result = append(result, spanstore.Operation{
Name: operation,
})
}
return result, err
return operations, err
}

func bucketToStringArray(buckets []*elastic.AggregationBucketKeyItem) ([]string, error) {
Expand Down
135 changes: 109 additions & 26 deletions plugin/storage/es/spanstore/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -627,33 +627,36 @@ func TestSpanReader_indexWithDate(t *testing.T) {
})
}

type testGetStruct struct {
caption string
searchResult *elastic.SearchResult
searchError error
expectedError func() string
expectedOutput map[string]any
}

func testGet(typ string, t *testing.T) {
testGetWithKind(typ, t, false)
}

func testGetWithKind(typ string, t *testing.T, testKind bool) {
goodAggregations := make(map[string]*json.RawMessage)
rawMessage := []byte(`{"buckets": [{"key": "123","doc_count": 16}]}`)
goodAggregations[typ] = (*json.RawMessage)(&rawMessage)

var filterRawMessage json.RawMessage
if typ == operationsAggregation {
if testKind {
filterRawMessage = rawMessage
}
goodAggregations[typ] = &filterRawMessage
} else {
goodAggregations[typ] = (*json.RawMessage)(&rawMessage)
}
badAggregations := make(map[string]*json.RawMessage)
badRawMessage := []byte(`{"buckets": [{bad json]}asdf`)
badAggregations[typ] = (*json.RawMessage)(&badRawMessage)

testCases := []struct {
caption string
searchResult *elastic.SearchResult
searchError error
expectedError func() string
expectedOutput map[string]any
}{
{
caption: typ + " full behavior",
searchResult: &elastic.SearchResult{Aggregations: elastic.Aggregations(goodAggregations)},
expectedOutput: map[string]any{
operationsAggregation: []spanstore.Operation{{Name: "123"}},
"default": []string{"123"},
},
expectedError: func() string {
return ""
},
},
testCases := []testGetStruct{
{
caption: typ + " search error",
searchError: errors.New("Search failure"),
Expand All @@ -664,21 +667,86 @@ func testGet(typ string, t *testing.T) {
return "search services failed: Search failure"
},
},
{
}

if (typ == operationsAggregation && testKind) || (typ != operationsAggregation) {
testCase := testGetStruct{
caption: typ + " search error",
searchResult: &elastic.SearchResult{Aggregations: elastic.Aggregations(badAggregations)},
expectedError: func() string {
return "could not find aggregation of " + typ
},
},
}
testCases = append(testCases, testCase)
}

if testKind {
testCases = append(testCases, testGetStruct{
caption: typ + " full behavior with kind",
searchResult: &elastic.SearchResult{Aggregations: goodAggregations},
expectedOutput: map[string]any{
operationsAggregation: []spanstore.Operation{{Name: "123", SpanKind: "server"}},
"default": []string{"123"},
},
expectedError: func() string {
return ""
},
})
}

if typ == operationsAggregation && !testKind {
score := 0.6931471
msg := json.RawMessage(`{"operationName": "123"}`)
hitModel := &elastic.SearchHits{
TotalHits: 1,
MaxScore: &score,
Hits: []*elastic.SearchHit{
{
Score: &score,
SeqNo: nil,
Id: "e232b0fbe5cebc85",
PrimaryTerm: nil,
Source: &msg,
},
},
}
testCases = append(testCases, testGetStruct{
caption: typ + " full behavior",
searchResult: &elastic.SearchResult{Hits: hitModel},
expectedOutput: map[string]any{
operationsAggregation: []spanstore.Operation{{Name: "123"}},
"default": []string{"123"},
},
expectedError: func() string {
return ""
},
})
}

if typ != operationsAggregation {
testCases = append(testCases, testGetStruct{
caption: typ + " full behavior",
searchResult: &elastic.SearchResult{Aggregations: goodAggregations},
expectedOutput: map[string]any{
operationsAggregation: []spanstore.Operation{{Name: "123"}},
"default": []string{"123"},
},
expectedError: func() string {
return ""
},
})
}

for _, tc := range testCases {
testCase := tc
t.Run(testCase.caption, func(t *testing.T) {
withSpanReader(t, func(r *spanReaderTest) {
mockSearchService(r).Return(testCase.searchResult, testCase.searchError)
actual, err := returnSearchFunc(typ, r)
if testKind {
mockSearchServiceWithSpanKind(r, true).Return(testCase.searchResult, testCase.searchError)
} else {
mockSearchService(r).Return(testCase.searchResult, testCase.searchError)
}
actual, err := returnSearchFunc(typ, r, testKind)
if testCase.expectedError() != "" {
require.EqualError(t, err, testCase.expectedError())
assert.Nil(t, actual)
Expand All @@ -692,11 +760,17 @@ func testGet(typ string, t *testing.T) {
}
}

func returnSearchFunc(typ string, r *spanReaderTest) (any, error) {
func returnSearchFunc(typ string, r *spanReaderTest, testKind bool) (any, error) {
switch typ {
case servicesAggregation:
return r.reader.GetServices(context.Background())
case operationsAggregation:
if testKind {
return r.reader.GetOperations(
context.Background(),
spanstore.OperationQueryParameters{ServiceName: "someservice", SpanKind: "server"},
)
}
return r.reader.GetOperations(
context.Background(),
spanstore.OperationQueryParameters{ServiceName: "someService"},
Expand Down Expand Up @@ -1012,14 +1086,23 @@ func matchTermsAggregation(termsAgg *elastic.TermsAggregation) bool {
}

func mockSearchService(r *spanReaderTest) *mock.Call {
return mockSearchServiceWithSpanKind(r, false)
}

func mockSearchServiceWithSpanKind(r *spanReaderTest, inputHasSpanKind bool) *mock.Call {
searchService := &mocks.SearchService{}
searchService.On("Query", mock.Anything).Return(searchService)
searchService.On("IgnoreUnavailable", mock.AnythingOfType("bool")).Return(searchService)
searchService.On("Size", mock.MatchedBy(func(size int) bool {
return size == 0 // Aggregations apply size (bucket) limits in their own query objects, and do not apply at the parent query level.
})).Return(searchService)
searchService.On("IgnoreUnavailable", mock.AnythingOfType("bool")).Return(searchService)
searchService.On("Aggregation", stringMatcher(servicesAggregation), mock.MatchedBy(matchTermsAggregation)).Return(searchService)
searchService.On("Aggregation", stringMatcher(operationsAggregation), mock.MatchedBy(matchTermsAggregation)).Return(searchService)
if inputHasSpanKind {
searchService.On("Aggregation", stringMatcher(operationsAggregation), mock.MatchedBy(matchTermsAggregation)).Return(searchService)
} else {
searchService.On("FetchSourceContext", mock.AnythingOfType("*elastic.FetchSourceContext"), mock.AnythingOfType("string"), mock.AnythingOfType("string")).Return(searchService)
searchService.On("Size", mock.AnythingOfType("int")).Return(searchService)
}
searchService.On("Aggregation", stringMatcher(traceIDAggregation), mock.AnythingOfType("*elastic.TermsAggregation")).Return(searchService)
r.client.On("Search", mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("string")).Return(searchService)
return searchService.On("Do", mock.Anything)
Expand Down
Loading

0 comments on commit 03a7004

Please sign in to comment.