Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add context to dependencies endpoint #2434

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,11 @@ func (r *DependencyStore) WriteDependencies(ts time.Time, dependencies []model.D
}

// GetDependencies implements dependencystore.Reader
func (r *DependencyStore) GetDependencies(endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
func (r *DependencyStore) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
searchBody := getSearchBody(endTs, lookback)

indices := dailyIndices(r.indexPrefix, endTs, lookback)
response, err := r.client.Search(context.Background(), searchBody, defaultDocCount, indices...)
response, err := r.client.Search(ctx, searchBody, defaultDocCount, indices...)
if err != nil {
return nil, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func TestGetDependencies(t *testing.T) {
},
}
store := NewDependencyStore(client, zap.NewNop(), "foo")
dependencies, err := store.GetDependencies(tsNow, time.Hour)
dependencies, err := store.GetDependencies(context.Background(), tsNow, time.Hour)
require.NoError(t, err)
assert.Equal(t, timeDependencies, dbmodel.TimeDependencies{
Timestamp: tsNow,
Expand All @@ -108,7 +108,7 @@ func TestGetDependencies_err_unmarshall(t *testing.T) {
},
}
store := NewDependencyStore(client, zap.NewNop(), "foo")
dependencies, err := store.GetDependencies(tsNow, time.Hour)
dependencies, err := store.GetDependencies(context.Background(), tsNow, time.Hour)
require.Contains(t, err.Error(), "invalid character")
assert.Nil(t, dependencies)
}
Expand All @@ -120,7 +120,7 @@ func TestGetDependencies_err_client(t *testing.T) {
}
store := NewDependencyStore(client, zap.NewNop(), "foo")
tsNow := time.Now()
dependencies, err := store.GetDependencies(tsNow, time.Hour)
dependencies, err := store.GetDependencies(context.Background(), tsNow, time.Hour)
require.Error(t, err)
assert.Nil(t, dependencies)
assert.Contains(t, err.Error(), searchErr.Error())
Expand Down
2 changes: 1 addition & 1 deletion cmd/query/app/grpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func (g *GRPCHandler) GetOperations(
func (g *GRPCHandler) GetDependencies(ctx context.Context, r *api_v2.GetDependenciesRequest) (*api_v2.GetDependenciesResponse, error) {
startTime := r.StartTime
endTime := r.EndTime
dependencies, err := g.queryService.GetDependencies(startTime, endTime.Sub(startTime))
dependencies, err := g.queryService.GetDependencies(ctx, startTime, endTime.Sub(startTime))
if err != nil {
g.logger.Error("failed to fetch dependencies", zap.Error(err))
return nil, status.Errorf(codes.Internal, "failed to fetch dependencies: %v", err)
Expand Down
2 changes: 1 addition & 1 deletion cmd/query/app/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ func (aH *APIHandler) dependencies(w http.ResponseWriter, r *http.Request) {
}
endTs := time.Unix(0, 0).Add(time.Duration(endTsMillis) * time.Millisecond)

dependencies, err := aH.queryService.GetDependencies(endTs, lookback)
dependencies, err := aH.queryService.GetDependencies(r.Context(), endTs, lookback)
if aH.handleError(w, err, http.StatusInternalServerError) {
return
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/query/app/querysvc/query_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ func (qs QueryService) Adjust(trace *model.Trace) (*model.Trace, error) {
}

// GetDependencies implements dependencystore.Reader.GetDependencies
func (qs QueryService) GetDependencies(endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
return qs.dependencyReader.GetDependencies(endTs, lookback)
func (qs QueryService) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
return qs.dependencyReader.GetDependencies(ctx, endTs, lookback)
}

// InitArchiveStorage tries to initialize archive storage reader/writer if storage factory supports them.
Expand Down
2 changes: 1 addition & 1 deletion cmd/query/app/querysvc/query_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ func TestGetDependencies(t *testing.T) {
endTs := time.Unix(0, 1476374248550*millisToNanosMultiplier)
depsMock.On("GetDependencies", endTs, defaultDependencyLookbackDuration).Return(expectedDependencies, nil).Times(1)

actualDependencies, err := qs.GetDependencies(time.Unix(0, 1476374248550*millisToNanosMultiplier), defaultDependencyLookbackDuration)
actualDependencies, err := qs.GetDependencies(context.Background(), time.Unix(0, 1476374248550*millisToNanosMultiplier), defaultDependencyLookbackDuration)
assert.NoError(t, err)
assert.Equal(t, expectedDependencies, actualDependencies)
}
Expand Down
2 changes: 1 addition & 1 deletion plugin/storage/badger/dependencystore/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func NewDependencyStore(store spanstore.Reader) *DependencyStore {
}

// GetDependencies returns all interservice dependencies, implements DependencyReader
func (s *DependencyStore) GetDependencies(endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
func (s *DependencyStore) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
deps := map[string]*model.DependencyLink{}

params := &spanstore.TraceQueryParameters{
Expand Down
5 changes: 3 additions & 2 deletions plugin/storage/badger/dependencystore/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package dependencystore_test

import (
"context"
"fmt"
"io"
"testing"
Expand Down Expand Up @@ -66,7 +67,7 @@ func runFactoryTest(tb testing.TB, test func(tb testing.TB, sw spanstore.Writer,
func TestDependencyReader(t *testing.T) {
runFactoryTest(t, func(tb testing.TB, sw spanstore.Writer, dr dependencystore.Reader) {
tid := time.Now()
links, err := dr.GetDependencies(tid, time.Hour)
links, err := dr.GetDependencies(context.Background(), tid, time.Hour)
assert.NoError(t, err)
assert.Empty(t, links)

Expand Down Expand Up @@ -94,7 +95,7 @@ func TestDependencyReader(t *testing.T) {
assert.NoError(t, err)
}
}
links, err = dr.GetDependencies(time.Now(), time.Hour)
links, err = dr.GetDependencies(context.Background(), time.Now(), time.Hour)
assert.NoError(t, err)
assert.NotEmpty(t, links)
assert.Equal(t, spans-1, len(links)) // First span does not create a dependency
Expand Down
3 changes: 2 additions & 1 deletion plugin/storage/cassandra/dependencystore/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package dependencystore

import (
"context"
"errors"
"fmt"
"time"
Expand Down Expand Up @@ -106,7 +107,7 @@ func (s *DependencyStore) WriteDependencies(ts time.Time, dependencies []model.D
}

// GetDependencies returns all interservice dependencies
func (s *DependencyStore) GetDependencies(endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
func (s *DependencyStore) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
startTs := endTs.Add(-1 * lookback)
var query cassandra.Query
switch s.version {
Expand Down
3 changes: 2 additions & 1 deletion plugin/storage/cassandra/dependencystore/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package dependencystore

import (
"context"
"errors"
"strings"
"testing"
Expand Down Expand Up @@ -225,7 +226,7 @@ func TestDependencyStoreGetDependencies(t *testing.T) {

s.session.On("Query", mock.AnythingOfType("string"), matchEverything()).Return(query)

deps, err := s.storage.GetDependencies(time.Now(), 48*time.Hour)
deps, err := s.storage.GetDependencies(context.Background(), time.Now(), 48*time.Hour)

if testCase.expectedError == "" {
assert.NoError(t, err)
Expand Down
6 changes: 2 additions & 4 deletions plugin/storage/es/dependencystore/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ const (

// DependencyStore handles all queries and insertions to ElasticSearch dependencies
type DependencyStore struct {
ctx context.Context
client es.Client
logger *zap.Logger
indexPrefix string
Expand All @@ -50,7 +49,6 @@ func NewDependencyStore(client es.Client, logger *zap.Logger, indexPrefix string
prefix = indexPrefix + "-"
}
return &DependencyStore{
ctx: context.Background(),
client: client,
logger: logger,
indexPrefix: prefix + dependencyIndex,
Expand Down Expand Up @@ -81,13 +79,13 @@ func (s *DependencyStore) writeDependencies(indexName string, ts time.Time, depe
}

// GetDependencies returns all interservice dependencies
func (s *DependencyStore) GetDependencies(endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
func (s *DependencyStore) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
indices := getIndices(s.indexPrefix, endTs, lookback)
searchResult, err := s.client.Search(indices...).
Size(10000). // the default elasticsearch allowed limit
Query(buildTSQuery(endTs, lookback)).
IgnoreUnavailable(true).
Do(s.ctx)
Do(ctx)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Can we delete the field s.ctx altogether? I think it was implemented as anti-pattern.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deleted

if err != nil {
return nil, fmt.Errorf("failed to search for dependencies: %w", err)
}
Expand Down
3 changes: 2 additions & 1 deletion plugin/storage/es/dependencystore/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package dependencystore

import (
"context"
"encoding/json"
"errors"
"strings"
Expand Down Expand Up @@ -171,7 +172,7 @@ func TestGetDependencies(t *testing.T) {
searchService.On("IgnoreUnavailable", mock.AnythingOfType("bool")).Return(searchService)
searchService.On("Do", mock.Anything).Return(testCase.searchResult, testCase.searchError)

actual, err := r.storage.GetDependencies(fixedTime, 24*time.Hour)
actual, err := r.storage.GetDependencies(context.Background(), fixedTime, 24*time.Hour)
if testCase.expectedError != "" {
assert.EqualError(t, err, testCase.expectedError)
assert.Nil(t, actual)
Expand Down
4 changes: 2 additions & 2 deletions plugin/storage/grpc/shared/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,8 @@ func (c *grpcClient) WriteSpan(span *model.Span) error {
}

// GetDependencies returns all interservice dependencies
func (c *grpcClient) GetDependencies(endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
resp, err := c.depsReaderClient.GetDependencies(context.Background(), &storage_v1.GetDependenciesRequest{
func (c *grpcClient) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
resp, err := c.depsReaderClient.GetDependencies(ctx, &storage_v1.GetDependenciesRequest{
EndTime: endTs,
StartTime: endTs.Add(-lookback),
})
Expand Down
2 changes: 1 addition & 1 deletion plugin/storage/grpc/shared/grpc_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ func TestGRPCClientGetDependencies(t *testing.T) {
EndTime: end,
}).Return(&storage_v1.GetDependenciesResponse{Dependencies: deps}, nil)

s, err := r.client.GetDependencies(end, lookback)
s, err := r.client.GetDependencies(context.Background(), end, lookback)
assert.NoError(t, err)
assert.Equal(t, deps, s)
})
Expand Down
2 changes: 1 addition & 1 deletion plugin/storage/grpc/shared/grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type grpcServer struct {

// GetDependencies returns all interservice dependencies
func (s *grpcServer) GetDependencies(ctx context.Context, r *storage_v1.GetDependenciesRequest) (*storage_v1.GetDependenciesResponse, error) {
deps, err := s.Impl.DependencyReader().GetDependencies(r.EndTime, r.EndTime.Sub(r.StartTime))
deps, err := s.Impl.DependencyReader().GetDependencies(ctx, r.EndTime, r.EndTime.Sub(r.StartTime))
if err != nil {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion plugin/storage/integration/cassandra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package integration

import (
"context"
"errors"
"os"
"testing"
Expand Down Expand Up @@ -134,7 +135,7 @@ func (s *StorageIntegration) testCassandraGetDependencies(t *testing.T) {
}
require.NoError(t, s.DependencyWriter.WriteDependencies(time.Now(), expected))
s.refresh(t)
actual, err := s.DependencyReader.GetDependencies(time.Now(), 5*time.Minute)
actual, err := s.DependencyReader.GetDependencies(context.Background(), time.Now(), 5*time.Minute)
assert.NoError(t, err)
assert.EqualValues(t, expected, actual)
}
Expand Down
2 changes: 1 addition & 1 deletion plugin/storage/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ func (s *StorageIntegration) testGetDependencies(t *testing.T) {
}
require.NoError(t, s.DependencyWriter.WriteDependencies(time.Now(), expected))
s.refresh(t)
actual, err := s.DependencyReader.GetDependencies(time.Now(), 5*time.Minute)
actual, err := s.DependencyReader.GetDependencies(context.Background(), time.Now(), 5*time.Minute)
assert.NoError(t, err)
assert.EqualValues(t, expected, actual)
}
Expand Down
2 changes: 1 addition & 1 deletion plugin/storage/memory/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func WithConfiguration(configuration config.Configuration) *Store {
}

// GetDependencies returns dependencies between services
func (m *Store) GetDependencies(endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
func (m *Store) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
// deduper used below can modify the spans, so we take an exclusive lock
m.Lock()
defer m.Unlock()
Expand Down
6 changes: 3 additions & 3 deletions plugin/storage/memory/memory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func withMemoryStore(f func(store *Store)) {

func TestStoreGetEmptyDependencies(t *testing.T) {
withMemoryStore(func(store *Store) {
links, err := store.GetDependencies(time.Now(), time.Hour)
links, err := store.GetDependencies(context.Background(), time.Now(), time.Hour)
assert.NoError(t, err)
assert.Empty(t, links)
})
Expand All @@ -151,11 +151,11 @@ func TestStoreGetDependencies(t *testing.T) {
assert.NoError(t, store.WriteSpan(childSpan1))
assert.NoError(t, store.WriteSpan(childSpan2))
assert.NoError(t, store.WriteSpan(childSpan2_1))
links, err := store.GetDependencies(time.Now(), time.Hour)
links, err := store.GetDependencies(context.Background(), time.Now(), time.Hour)
assert.NoError(t, err)
assert.Empty(t, links)

links, err = store.GetDependencies(time.Unix(0, 0).Add(time.Hour), time.Hour)
links, err = store.GetDependencies(context.Background(), time.Unix(0, 0).Add(time.Hour), time.Hour)
assert.NoError(t, err)
assert.Equal(t, []model.DependencyLink{{
Parent: "serviceName",
Expand Down
3 changes: 2 additions & 1 deletion storage/dependencystore/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package dependencystore

import (
"context"
"time"

"github.com/jaegertracing/jaeger/model"
Expand All @@ -28,5 +29,5 @@ type Writer interface {

// Reader can load service dependencies from storage.
type Reader interface {
GetDependencies(endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error)
GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error)
}
23 changes: 14 additions & 9 deletions storage/dependencystore/mocks/Reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,32 +15,37 @@

package mocks

import dependencystore "github.com/jaegertracing/jaeger/storage/dependencystore"
import mock "github.com/stretchr/testify/mock"
import model "github.com/jaegertracing/jaeger/model"
import time "time"
import (
"context"
"time"

"github.com/stretchr/testify/mock"

"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/storage/dependencystore"
)

// Reader is an autogenerated mock type for the Reader type
type Reader struct {
mock.Mock
}

// GetDependencies provides a mock function with given fields: endTs, lookback
func (_m *Reader) GetDependencies(endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
func (_m *Reader) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
ret := _m.Called(endTs, lookback)

var r0 []model.DependencyLink
if rf, ok := ret.Get(0).(func(time.Time, time.Duration) []model.DependencyLink); ok {
r0 = rf(endTs, lookback)
if rf, ok := ret.Get(0).(func(context.Context, time.Time, time.Duration) []model.DependencyLink); ok {
r0 = rf(ctx, endTs, lookback)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]model.DependencyLink)
}
}

var r1 error
if rf, ok := ret.Get(1).(func(time.Time, time.Duration) error); ok {
r1 = rf(endTs, lookback)
if rf, ok := ret.Get(1).(func(context.Context, time.Time, time.Duration) error); ok {
r1 = rf(ctx, endTs, lookback)
} else {
r1 = ret.Error(1)
}
Expand Down