diff --git a/cmd/opentelemetry/app/internal/reader/es/esdependencyreader/dependency_store.go b/cmd/opentelemetry/app/internal/reader/es/esdependencyreader/dependency_store.go index ac1f313b965..b73d3c68354 100644 --- a/cmd/opentelemetry/app/internal/reader/es/esdependencyreader/dependency_store.go +++ b/cmd/opentelemetry/app/internal/reader/es/esdependencyreader/dependency_store.go @@ -82,11 +82,11 @@ func (r *DependencyStore) WriteDependencies(ts time.Time, dependencies []model.D } // GetDependencies implements dependencystore.Reader -func (r *DependencyStore) GetDependencies(endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) { +func (r *DependencyStore) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) { searchBody := getSearchBody(endTs, lookback) indices := dailyIndices(r.indexPrefix, endTs, lookback) - response, err := r.client.Search(context.Background(), searchBody, defaultDocCount, indices...) + response, err := r.client.Search(ctx, searchBody, defaultDocCount, indices...) if err != nil { return nil, err } diff --git a/cmd/opentelemetry/app/internal/reader/es/esdependencyreader/dependency_store_test.go b/cmd/opentelemetry/app/internal/reader/es/esdependencyreader/dependency_store_test.go index cef22308974..c7d05a342ed 100644 --- a/cmd/opentelemetry/app/internal/reader/es/esdependencyreader/dependency_store_test.go +++ b/cmd/opentelemetry/app/internal/reader/es/esdependencyreader/dependency_store_test.go @@ -86,7 +86,7 @@ func TestGetDependencies(t *testing.T) { }, } store := NewDependencyStore(client, zap.NewNop(), "foo") - dependencies, err := store.GetDependencies(tsNow, time.Hour) + dependencies, err := store.GetDependencies(context.Background(), tsNow, time.Hour) require.NoError(t, err) assert.Equal(t, timeDependencies, dbmodel.TimeDependencies{ Timestamp: tsNow, @@ -108,7 +108,7 @@ func TestGetDependencies_err_unmarshall(t *testing.T) { }, } store := NewDependencyStore(client, zap.NewNop(), "foo") - dependencies, err := store.GetDependencies(tsNow, time.Hour) + dependencies, err := store.GetDependencies(context.Background(), tsNow, time.Hour) require.Contains(t, err.Error(), "invalid character") assert.Nil(t, dependencies) } @@ -120,7 +120,7 @@ func TestGetDependencies_err_client(t *testing.T) { } store := NewDependencyStore(client, zap.NewNop(), "foo") tsNow := time.Now() - dependencies, err := store.GetDependencies(tsNow, time.Hour) + dependencies, err := store.GetDependencies(context.Background(), tsNow, time.Hour) require.Error(t, err) assert.Nil(t, dependencies) assert.Contains(t, err.Error(), searchErr.Error()) diff --git a/cmd/query/app/grpc_handler.go b/cmd/query/app/grpc_handler.go index b85d6cf891c..6954de45520 100644 --- a/cmd/query/app/grpc_handler.go +++ b/cmd/query/app/grpc_handler.go @@ -165,7 +165,7 @@ func (g *GRPCHandler) GetOperations( func (g *GRPCHandler) GetDependencies(ctx context.Context, r *api_v2.GetDependenciesRequest) (*api_v2.GetDependenciesResponse, error) { startTime := r.StartTime endTime := r.EndTime - dependencies, err := g.queryService.GetDependencies(startTime, endTime.Sub(startTime)) + dependencies, err := g.queryService.GetDependencies(ctx, startTime, endTime.Sub(startTime)) if err != nil { g.logger.Error("failed to fetch dependencies", zap.Error(err)) return nil, status.Errorf(codes.Internal, "failed to fetch dependencies: %v", err) diff --git a/cmd/query/app/http_handler.go b/cmd/query/app/http_handler.go index 9be69cf84c2..445de54ff2e 100644 --- a/cmd/query/app/http_handler.go +++ b/cmd/query/app/http_handler.go @@ -285,7 +285,7 @@ func (aH *APIHandler) dependencies(w http.ResponseWriter, r *http.Request) { } endTs := time.Unix(0, 0).Add(time.Duration(endTsMillis) * time.Millisecond) - dependencies, err := aH.queryService.GetDependencies(endTs, lookback) + dependencies, err := aH.queryService.GetDependencies(r.Context(), endTs, lookback) if aH.handleError(w, err, http.StatusInternalServerError) { return } diff --git a/cmd/query/app/querysvc/query_service.go b/cmd/query/app/querysvc/query_service.go index 55e52b4d6da..c91b52586f8 100644 --- a/cmd/query/app/querysvc/query_service.go +++ b/cmd/query/app/querysvc/query_service.go @@ -121,8 +121,8 @@ func (qs QueryService) Adjust(trace *model.Trace) (*model.Trace, error) { } // GetDependencies implements dependencystore.Reader.GetDependencies -func (qs QueryService) GetDependencies(endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) { - return qs.dependencyReader.GetDependencies(endTs, lookback) +func (qs QueryService) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) { + return qs.dependencyReader.GetDependencies(ctx, endTs, lookback) } // InitArchiveStorage tries to initialize archive storage reader/writer if storage factory supports them. diff --git a/cmd/query/app/querysvc/query_service_test.go b/cmd/query/app/querysvc/query_service_test.go index 6700df70236..e68482ba435 100644 --- a/cmd/query/app/querysvc/query_service_test.go +++ b/cmd/query/app/querysvc/query_service_test.go @@ -259,7 +259,7 @@ func TestGetDependencies(t *testing.T) { endTs := time.Unix(0, 1476374248550*millisToNanosMultiplier) depsMock.On("GetDependencies", endTs, defaultDependencyLookbackDuration).Return(expectedDependencies, nil).Times(1) - actualDependencies, err := qs.GetDependencies(time.Unix(0, 1476374248550*millisToNanosMultiplier), defaultDependencyLookbackDuration) + actualDependencies, err := qs.GetDependencies(context.Background(), time.Unix(0, 1476374248550*millisToNanosMultiplier), defaultDependencyLookbackDuration) assert.NoError(t, err) assert.Equal(t, expectedDependencies, actualDependencies) } diff --git a/plugin/storage/badger/dependencystore/storage.go b/plugin/storage/badger/dependencystore/storage.go index 1b3cd6505a4..b50655fb1fb 100644 --- a/plugin/storage/badger/dependencystore/storage.go +++ b/plugin/storage/badger/dependencystore/storage.go @@ -35,7 +35,7 @@ func NewDependencyStore(store spanstore.Reader) *DependencyStore { } // GetDependencies returns all interservice dependencies, implements DependencyReader -func (s *DependencyStore) GetDependencies(endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) { +func (s *DependencyStore) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) { deps := map[string]*model.DependencyLink{} params := &spanstore.TraceQueryParameters{ diff --git a/plugin/storage/badger/dependencystore/storage_test.go b/plugin/storage/badger/dependencystore/storage_test.go index 5dd79d7b5d8..1141ff04e73 100644 --- a/plugin/storage/badger/dependencystore/storage_test.go +++ b/plugin/storage/badger/dependencystore/storage_test.go @@ -15,6 +15,7 @@ package dependencystore_test import ( + "context" "fmt" "io" "testing" @@ -66,7 +67,7 @@ func runFactoryTest(tb testing.TB, test func(tb testing.TB, sw spanstore.Writer, func TestDependencyReader(t *testing.T) { runFactoryTest(t, func(tb testing.TB, sw spanstore.Writer, dr dependencystore.Reader) { tid := time.Now() - links, err := dr.GetDependencies(tid, time.Hour) + links, err := dr.GetDependencies(context.Background(), tid, time.Hour) assert.NoError(t, err) assert.Empty(t, links) @@ -94,7 +95,7 @@ func TestDependencyReader(t *testing.T) { assert.NoError(t, err) } } - links, err = dr.GetDependencies(time.Now(), time.Hour) + links, err = dr.GetDependencies(context.Background(), time.Now(), time.Hour) assert.NoError(t, err) assert.NotEmpty(t, links) assert.Equal(t, spans-1, len(links)) // First span does not create a dependency diff --git a/plugin/storage/cassandra/dependencystore/storage.go b/plugin/storage/cassandra/dependencystore/storage.go index e4c76197576..f6ce1d8ad0c 100644 --- a/plugin/storage/cassandra/dependencystore/storage.go +++ b/plugin/storage/cassandra/dependencystore/storage.go @@ -16,6 +16,7 @@ package dependencystore import ( + "context" "errors" "fmt" "time" @@ -106,7 +107,7 @@ func (s *DependencyStore) WriteDependencies(ts time.Time, dependencies []model.D } // GetDependencies returns all interservice dependencies -func (s *DependencyStore) GetDependencies(endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) { +func (s *DependencyStore) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) { startTs := endTs.Add(-1 * lookback) var query cassandra.Query switch s.version { diff --git a/plugin/storage/cassandra/dependencystore/storage_test.go b/plugin/storage/cassandra/dependencystore/storage_test.go index 36f998dc182..c1d9fbf610d 100644 --- a/plugin/storage/cassandra/dependencystore/storage_test.go +++ b/plugin/storage/cassandra/dependencystore/storage_test.go @@ -16,6 +16,7 @@ package dependencystore import ( + "context" "errors" "strings" "testing" @@ -225,7 +226,7 @@ func TestDependencyStoreGetDependencies(t *testing.T) { s.session.On("Query", mock.AnythingOfType("string"), matchEverything()).Return(query) - deps, err := s.storage.GetDependencies(time.Now(), 48*time.Hour) + deps, err := s.storage.GetDependencies(context.Background(), time.Now(), 48*time.Hour) if testCase.expectedError == "" { assert.NoError(t, err) diff --git a/plugin/storage/es/dependencystore/storage.go b/plugin/storage/es/dependencystore/storage.go index a2a04aa98da..fc983bbf109 100644 --- a/plugin/storage/es/dependencystore/storage.go +++ b/plugin/storage/es/dependencystore/storage.go @@ -37,7 +37,6 @@ const ( // DependencyStore handles all queries and insertions to ElasticSearch dependencies type DependencyStore struct { - ctx context.Context client es.Client logger *zap.Logger indexPrefix string @@ -50,7 +49,6 @@ func NewDependencyStore(client es.Client, logger *zap.Logger, indexPrefix string prefix = indexPrefix + "-" } return &DependencyStore{ - ctx: context.Background(), client: client, logger: logger, indexPrefix: prefix + dependencyIndex, @@ -81,13 +79,13 @@ func (s *DependencyStore) writeDependencies(indexName string, ts time.Time, depe } // GetDependencies returns all interservice dependencies -func (s *DependencyStore) GetDependencies(endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) { +func (s *DependencyStore) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) { indices := getIndices(s.indexPrefix, endTs, lookback) searchResult, err := s.client.Search(indices...). Size(10000). // the default elasticsearch allowed limit Query(buildTSQuery(endTs, lookback)). IgnoreUnavailable(true). - Do(s.ctx) + Do(ctx) if err != nil { return nil, fmt.Errorf("failed to search for dependencies: %w", err) } diff --git a/plugin/storage/es/dependencystore/storage_test.go b/plugin/storage/es/dependencystore/storage_test.go index 1a001cbd802..2b6f460fad4 100644 --- a/plugin/storage/es/dependencystore/storage_test.go +++ b/plugin/storage/es/dependencystore/storage_test.go @@ -16,6 +16,7 @@ package dependencystore import ( + "context" "encoding/json" "errors" "strings" @@ -171,7 +172,7 @@ func TestGetDependencies(t *testing.T) { searchService.On("IgnoreUnavailable", mock.AnythingOfType("bool")).Return(searchService) searchService.On("Do", mock.Anything).Return(testCase.searchResult, testCase.searchError) - actual, err := r.storage.GetDependencies(fixedTime, 24*time.Hour) + actual, err := r.storage.GetDependencies(context.Background(), fixedTime, 24*time.Hour) if testCase.expectedError != "" { assert.EqualError(t, err, testCase.expectedError) assert.Nil(t, actual) diff --git a/plugin/storage/grpc/shared/grpc_client.go b/plugin/storage/grpc/shared/grpc_client.go index 8f84ded370c..d3ebabc6401 100644 --- a/plugin/storage/grpc/shared/grpc_client.go +++ b/plugin/storage/grpc/shared/grpc_client.go @@ -206,8 +206,8 @@ func (c *grpcClient) WriteSpan(span *model.Span) error { } // GetDependencies returns all interservice dependencies -func (c *grpcClient) GetDependencies(endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) { - resp, err := c.depsReaderClient.GetDependencies(context.Background(), &storage_v1.GetDependenciesRequest{ +func (c *grpcClient) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) { + resp, err := c.depsReaderClient.GetDependencies(ctx, &storage_v1.GetDependenciesRequest{ EndTime: endTs, StartTime: endTs.Add(-lookback), }) diff --git a/plugin/storage/grpc/shared/grpc_client_test.go b/plugin/storage/grpc/shared/grpc_client_test.go index e31c9530a56..3366b05d95c 100644 --- a/plugin/storage/grpc/shared/grpc_client_test.go +++ b/plugin/storage/grpc/shared/grpc_client_test.go @@ -297,7 +297,7 @@ func TestGRPCClientGetDependencies(t *testing.T) { EndTime: end, }).Return(&storage_v1.GetDependenciesResponse{Dependencies: deps}, nil) - s, err := r.client.GetDependencies(end, lookback) + s, err := r.client.GetDependencies(context.Background(), end, lookback) assert.NoError(t, err) assert.Equal(t, deps, s) }) diff --git a/plugin/storage/grpc/shared/grpc_server.go b/plugin/storage/grpc/shared/grpc_server.go index b4137b15e88..d79df7d6569 100644 --- a/plugin/storage/grpc/shared/grpc_server.go +++ b/plugin/storage/grpc/shared/grpc_server.go @@ -32,7 +32,7 @@ type grpcServer struct { // GetDependencies returns all interservice dependencies func (s *grpcServer) GetDependencies(ctx context.Context, r *storage_v1.GetDependenciesRequest) (*storage_v1.GetDependenciesResponse, error) { - deps, err := s.Impl.DependencyReader().GetDependencies(r.EndTime, r.EndTime.Sub(r.StartTime)) + deps, err := s.Impl.DependencyReader().GetDependencies(ctx, r.EndTime, r.EndTime.Sub(r.StartTime)) if err != nil { return nil, err } diff --git a/plugin/storage/integration/cassandra_test.go b/plugin/storage/integration/cassandra_test.go index b4048af6ef3..fe5983667ae 100644 --- a/plugin/storage/integration/cassandra_test.go +++ b/plugin/storage/integration/cassandra_test.go @@ -16,6 +16,7 @@ package integration import ( + "context" "errors" "os" "testing" @@ -134,7 +135,7 @@ func (s *StorageIntegration) testCassandraGetDependencies(t *testing.T) { } require.NoError(t, s.DependencyWriter.WriteDependencies(time.Now(), expected)) s.refresh(t) - actual, err := s.DependencyReader.GetDependencies(time.Now(), 5*time.Minute) + actual, err := s.DependencyReader.GetDependencies(context.Background(), time.Now(), 5*time.Minute) assert.NoError(t, err) assert.EqualValues(t, expected, actual) } diff --git a/plugin/storage/integration/integration.go b/plugin/storage/integration/integration.go index 75dc69d3779..8f5382c05d7 100644 --- a/plugin/storage/integration/integration.go +++ b/plugin/storage/integration/integration.go @@ -364,7 +364,7 @@ func (s *StorageIntegration) testGetDependencies(t *testing.T) { } require.NoError(t, s.DependencyWriter.WriteDependencies(time.Now(), expected)) s.refresh(t) - actual, err := s.DependencyReader.GetDependencies(time.Now(), 5*time.Minute) + actual, err := s.DependencyReader.GetDependencies(context.Background(), time.Now(), 5*time.Minute) assert.NoError(t, err) assert.EqualValues(t, expected, actual) } diff --git a/plugin/storage/memory/memory.go b/plugin/storage/memory/memory.go index d8008a00c6d..fa11dcc9dbb 100644 --- a/plugin/storage/memory/memory.go +++ b/plugin/storage/memory/memory.go @@ -58,7 +58,7 @@ func WithConfiguration(configuration config.Configuration) *Store { } // GetDependencies returns dependencies between services -func (m *Store) GetDependencies(endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) { +func (m *Store) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) { // deduper used below can modify the spans, so we take an exclusive lock m.Lock() defer m.Unlock() diff --git a/plugin/storage/memory/memory_test.go b/plugin/storage/memory/memory_test.go index e9ab800c798..fb847b6a044 100644 --- a/plugin/storage/memory/memory_test.go +++ b/plugin/storage/memory/memory_test.go @@ -139,7 +139,7 @@ func withMemoryStore(f func(store *Store)) { func TestStoreGetEmptyDependencies(t *testing.T) { withMemoryStore(func(store *Store) { - links, err := store.GetDependencies(time.Now(), time.Hour) + links, err := store.GetDependencies(context.Background(), time.Now(), time.Hour) assert.NoError(t, err) assert.Empty(t, links) }) @@ -151,11 +151,11 @@ func TestStoreGetDependencies(t *testing.T) { assert.NoError(t, store.WriteSpan(childSpan1)) assert.NoError(t, store.WriteSpan(childSpan2)) assert.NoError(t, store.WriteSpan(childSpan2_1)) - links, err := store.GetDependencies(time.Now(), time.Hour) + links, err := store.GetDependencies(context.Background(), time.Now(), time.Hour) assert.NoError(t, err) assert.Empty(t, links) - links, err = store.GetDependencies(time.Unix(0, 0).Add(time.Hour), time.Hour) + links, err = store.GetDependencies(context.Background(), time.Unix(0, 0).Add(time.Hour), time.Hour) assert.NoError(t, err) assert.Equal(t, []model.DependencyLink{{ Parent: "serviceName", diff --git a/storage/dependencystore/interface.go b/storage/dependencystore/interface.go index 20054b97f6a..4a68149862d 100644 --- a/storage/dependencystore/interface.go +++ b/storage/dependencystore/interface.go @@ -16,6 +16,7 @@ package dependencystore import ( + "context" "time" "github.com/jaegertracing/jaeger/model" @@ -28,5 +29,5 @@ type Writer interface { // Reader can load service dependencies from storage. type Reader interface { - GetDependencies(endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) + GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) } diff --git a/storage/dependencystore/mocks/Reader.go b/storage/dependencystore/mocks/Reader.go index a5711f8c359..a289fc0d213 100644 --- a/storage/dependencystore/mocks/Reader.go +++ b/storage/dependencystore/mocks/Reader.go @@ -15,10 +15,15 @@ package mocks -import dependencystore "github.com/jaegertracing/jaeger/storage/dependencystore" -import mock "github.com/stretchr/testify/mock" -import model "github.com/jaegertracing/jaeger/model" -import time "time" +import ( + "context" + "time" + + "github.com/stretchr/testify/mock" + + "github.com/jaegertracing/jaeger/model" + "github.com/jaegertracing/jaeger/storage/dependencystore" +) // Reader is an autogenerated mock type for the Reader type type Reader struct { @@ -26,12 +31,12 @@ type Reader struct { } // GetDependencies provides a mock function with given fields: endTs, lookback -func (_m *Reader) GetDependencies(endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) { +func (_m *Reader) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) { ret := _m.Called(endTs, lookback) var r0 []model.DependencyLink - if rf, ok := ret.Get(0).(func(time.Time, time.Duration) []model.DependencyLink); ok { - r0 = rf(endTs, lookback) + if rf, ok := ret.Get(0).(func(context.Context, time.Time, time.Duration) []model.DependencyLink); ok { + r0 = rf(ctx, endTs, lookback) } else { if ret.Get(0) != nil { r0 = ret.Get(0).([]model.DependencyLink) @@ -39,8 +44,8 @@ func (_m *Reader) GetDependencies(endTs time.Time, lookback time.Duration) ([]mo } var r1 error - if rf, ok := ret.Get(1).(func(time.Time, time.Duration) error); ok { - r1 = rf(endTs, lookback) + if rf, ok := ret.Get(1).(func(context.Context, time.Time, time.Duration) error); ok { + r1 = rf(ctx, endTs, lookback) } else { r1 = ret.Error(1) }