Skip to content

Commit

Permalink
Add context.Context to dependencies endpoint (#2434)
Browse files Browse the repository at this point in the history
* Propagate context on dependencies endpoint

Signed-off-by: Yoav Eyal <[email protected]>

* PR

Signed-off-by: Yoav Eyal <[email protected]>

* Run fmt

Signed-off-by: yoave23 <[email protected]>
  • Loading branch information
yoave23 authored Aug 31, 2020
1 parent af985ae commit 5fb5e8a
Show file tree
Hide file tree
Showing 21 changed files with 49 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,11 @@ func (r *DependencyStore) WriteDependencies(ts time.Time, dependencies []model.D
}

// GetDependencies implements dependencystore.Reader
func (r *DependencyStore) GetDependencies(endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
func (r *DependencyStore) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
searchBody := getSearchBody(endTs, lookback)

indices := dailyIndices(r.indexPrefix, endTs, lookback)
response, err := r.client.Search(context.Background(), searchBody, defaultDocCount, indices...)
response, err := r.client.Search(ctx, searchBody, defaultDocCount, indices...)
if err != nil {
return nil, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func TestGetDependencies(t *testing.T) {
},
}
store := NewDependencyStore(client, zap.NewNop(), "foo")
dependencies, err := store.GetDependencies(tsNow, time.Hour)
dependencies, err := store.GetDependencies(context.Background(), tsNow, time.Hour)
require.NoError(t, err)
assert.Equal(t, timeDependencies, dbmodel.TimeDependencies{
Timestamp: tsNow,
Expand All @@ -108,7 +108,7 @@ func TestGetDependencies_err_unmarshall(t *testing.T) {
},
}
store := NewDependencyStore(client, zap.NewNop(), "foo")
dependencies, err := store.GetDependencies(tsNow, time.Hour)
dependencies, err := store.GetDependencies(context.Background(), tsNow, time.Hour)
require.Contains(t, err.Error(), "invalid character")
assert.Nil(t, dependencies)
}
Expand All @@ -120,7 +120,7 @@ func TestGetDependencies_err_client(t *testing.T) {
}
store := NewDependencyStore(client, zap.NewNop(), "foo")
tsNow := time.Now()
dependencies, err := store.GetDependencies(tsNow, time.Hour)
dependencies, err := store.GetDependencies(context.Background(), tsNow, time.Hour)
require.Error(t, err)
assert.Nil(t, dependencies)
assert.Contains(t, err.Error(), searchErr.Error())
Expand Down
2 changes: 1 addition & 1 deletion cmd/query/app/grpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func (g *GRPCHandler) GetOperations(
func (g *GRPCHandler) GetDependencies(ctx context.Context, r *api_v2.GetDependenciesRequest) (*api_v2.GetDependenciesResponse, error) {
startTime := r.StartTime
endTime := r.EndTime
dependencies, err := g.queryService.GetDependencies(startTime, endTime.Sub(startTime))
dependencies, err := g.queryService.GetDependencies(ctx, startTime, endTime.Sub(startTime))
if err != nil {
g.logger.Error("failed to fetch dependencies", zap.Error(err))
return nil, status.Errorf(codes.Internal, "failed to fetch dependencies: %v", err)
Expand Down
2 changes: 1 addition & 1 deletion cmd/query/app/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ func (aH *APIHandler) dependencies(w http.ResponseWriter, r *http.Request) {
}
endTs := time.Unix(0, 0).Add(time.Duration(endTsMillis) * time.Millisecond)

dependencies, err := aH.queryService.GetDependencies(endTs, lookback)
dependencies, err := aH.queryService.GetDependencies(r.Context(), endTs, lookback)
if aH.handleError(w, err, http.StatusInternalServerError) {
return
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/query/app/querysvc/query_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ func (qs QueryService) Adjust(trace *model.Trace) (*model.Trace, error) {
}

// GetDependencies implements dependencystore.Reader.GetDependencies
func (qs QueryService) GetDependencies(endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
return qs.dependencyReader.GetDependencies(endTs, lookback)
func (qs QueryService) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
return qs.dependencyReader.GetDependencies(ctx, endTs, lookback)
}

// InitArchiveStorage tries to initialize archive storage reader/writer if storage factory supports them.
Expand Down
2 changes: 1 addition & 1 deletion cmd/query/app/querysvc/query_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ func TestGetDependencies(t *testing.T) {
endTs := time.Unix(0, 1476374248550*millisToNanosMultiplier)
depsMock.On("GetDependencies", endTs, defaultDependencyLookbackDuration).Return(expectedDependencies, nil).Times(1)

actualDependencies, err := qs.GetDependencies(time.Unix(0, 1476374248550*millisToNanosMultiplier), defaultDependencyLookbackDuration)
actualDependencies, err := qs.GetDependencies(context.Background(), time.Unix(0, 1476374248550*millisToNanosMultiplier), defaultDependencyLookbackDuration)
assert.NoError(t, err)
assert.Equal(t, expectedDependencies, actualDependencies)
}
Expand Down
2 changes: 1 addition & 1 deletion plugin/storage/badger/dependencystore/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func NewDependencyStore(store spanstore.Reader) *DependencyStore {
}

// GetDependencies returns all interservice dependencies, implements DependencyReader
func (s *DependencyStore) GetDependencies(endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
func (s *DependencyStore) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
deps := map[string]*model.DependencyLink{}

params := &spanstore.TraceQueryParameters{
Expand Down
5 changes: 3 additions & 2 deletions plugin/storage/badger/dependencystore/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package dependencystore_test

import (
"context"
"fmt"
"io"
"testing"
Expand Down Expand Up @@ -66,7 +67,7 @@ func runFactoryTest(tb testing.TB, test func(tb testing.TB, sw spanstore.Writer,
func TestDependencyReader(t *testing.T) {
runFactoryTest(t, func(tb testing.TB, sw spanstore.Writer, dr dependencystore.Reader) {
tid := time.Now()
links, err := dr.GetDependencies(tid, time.Hour)
links, err := dr.GetDependencies(context.Background(), tid, time.Hour)
assert.NoError(t, err)
assert.Empty(t, links)

Expand Down Expand Up @@ -94,7 +95,7 @@ func TestDependencyReader(t *testing.T) {
assert.NoError(t, err)
}
}
links, err = dr.GetDependencies(time.Now(), time.Hour)
links, err = dr.GetDependencies(context.Background(), time.Now(), time.Hour)
assert.NoError(t, err)
assert.NotEmpty(t, links)
assert.Equal(t, spans-1, len(links)) // First span does not create a dependency
Expand Down
3 changes: 2 additions & 1 deletion plugin/storage/cassandra/dependencystore/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package dependencystore

import (
"context"
"errors"
"fmt"
"time"
Expand Down Expand Up @@ -106,7 +107,7 @@ func (s *DependencyStore) WriteDependencies(ts time.Time, dependencies []model.D
}

// GetDependencies returns all interservice dependencies
func (s *DependencyStore) GetDependencies(endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
func (s *DependencyStore) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
startTs := endTs.Add(-1 * lookback)
var query cassandra.Query
switch s.version {
Expand Down
3 changes: 2 additions & 1 deletion plugin/storage/cassandra/dependencystore/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package dependencystore

import (
"context"
"errors"
"strings"
"testing"
Expand Down Expand Up @@ -225,7 +226,7 @@ func TestDependencyStoreGetDependencies(t *testing.T) {

s.session.On("Query", mock.AnythingOfType("string"), matchEverything()).Return(query)

deps, err := s.storage.GetDependencies(time.Now(), 48*time.Hour)
deps, err := s.storage.GetDependencies(context.Background(), time.Now(), 48*time.Hour)

if testCase.expectedError == "" {
assert.NoError(t, err)
Expand Down
6 changes: 2 additions & 4 deletions plugin/storage/es/dependencystore/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ const (

// DependencyStore handles all queries and insertions to ElasticSearch dependencies
type DependencyStore struct {
ctx context.Context
client es.Client
logger *zap.Logger
indexPrefix string
Expand All @@ -50,7 +49,6 @@ func NewDependencyStore(client es.Client, logger *zap.Logger, indexPrefix string
prefix = indexPrefix + "-"
}
return &DependencyStore{
ctx: context.Background(),
client: client,
logger: logger,
indexPrefix: prefix + dependencyIndex,
Expand Down Expand Up @@ -81,13 +79,13 @@ func (s *DependencyStore) writeDependencies(indexName string, ts time.Time, depe
}

// GetDependencies returns all interservice dependencies
func (s *DependencyStore) GetDependencies(endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
func (s *DependencyStore) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
indices := getIndices(s.indexPrefix, endTs, lookback)
searchResult, err := s.client.Search(indices...).
Size(10000). // the default elasticsearch allowed limit
Query(buildTSQuery(endTs, lookback)).
IgnoreUnavailable(true).
Do(s.ctx)
Do(ctx)
if err != nil {
return nil, fmt.Errorf("failed to search for dependencies: %w", err)
}
Expand Down
3 changes: 2 additions & 1 deletion plugin/storage/es/dependencystore/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package dependencystore

import (
"context"
"encoding/json"
"errors"
"strings"
Expand Down Expand Up @@ -171,7 +172,7 @@ func TestGetDependencies(t *testing.T) {
searchService.On("IgnoreUnavailable", mock.AnythingOfType("bool")).Return(searchService)
searchService.On("Do", mock.Anything).Return(testCase.searchResult, testCase.searchError)

actual, err := r.storage.GetDependencies(fixedTime, 24*time.Hour)
actual, err := r.storage.GetDependencies(context.Background(), fixedTime, 24*time.Hour)
if testCase.expectedError != "" {
assert.EqualError(t, err, testCase.expectedError)
assert.Nil(t, actual)
Expand Down
4 changes: 2 additions & 2 deletions plugin/storage/grpc/shared/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,8 @@ func (c *grpcClient) WriteSpan(span *model.Span) error {
}

// GetDependencies returns all interservice dependencies
func (c *grpcClient) GetDependencies(endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
resp, err := c.depsReaderClient.GetDependencies(context.Background(), &storage_v1.GetDependenciesRequest{
func (c *grpcClient) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
resp, err := c.depsReaderClient.GetDependencies(ctx, &storage_v1.GetDependenciesRequest{
EndTime: endTs,
StartTime: endTs.Add(-lookback),
})
Expand Down
2 changes: 1 addition & 1 deletion plugin/storage/grpc/shared/grpc_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ func TestGRPCClientGetDependencies(t *testing.T) {
EndTime: end,
}).Return(&storage_v1.GetDependenciesResponse{Dependencies: deps}, nil)

s, err := r.client.GetDependencies(end, lookback)
s, err := r.client.GetDependencies(context.Background(), end, lookback)
assert.NoError(t, err)
assert.Equal(t, deps, s)
})
Expand Down
2 changes: 1 addition & 1 deletion plugin/storage/grpc/shared/grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type grpcServer struct {

// GetDependencies returns all interservice dependencies
func (s *grpcServer) GetDependencies(ctx context.Context, r *storage_v1.GetDependenciesRequest) (*storage_v1.GetDependenciesResponse, error) {
deps, err := s.Impl.DependencyReader().GetDependencies(r.EndTime, r.EndTime.Sub(r.StartTime))
deps, err := s.Impl.DependencyReader().GetDependencies(ctx, r.EndTime, r.EndTime.Sub(r.StartTime))
if err != nil {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion plugin/storage/integration/cassandra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package integration

import (
"context"
"errors"
"os"
"testing"
Expand Down Expand Up @@ -134,7 +135,7 @@ func (s *StorageIntegration) testCassandraGetDependencies(t *testing.T) {
}
require.NoError(t, s.DependencyWriter.WriteDependencies(time.Now(), expected))
s.refresh(t)
actual, err := s.DependencyReader.GetDependencies(time.Now(), 5*time.Minute)
actual, err := s.DependencyReader.GetDependencies(context.Background(), time.Now(), 5*time.Minute)
assert.NoError(t, err)
assert.EqualValues(t, expected, actual)
}
Expand Down
2 changes: 1 addition & 1 deletion plugin/storage/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ func (s *StorageIntegration) testGetDependencies(t *testing.T) {
}
require.NoError(t, s.DependencyWriter.WriteDependencies(time.Now(), expected))
s.refresh(t)
actual, err := s.DependencyReader.GetDependencies(time.Now(), 5*time.Minute)
actual, err := s.DependencyReader.GetDependencies(context.Background(), time.Now(), 5*time.Minute)
assert.NoError(t, err)
assert.EqualValues(t, expected, actual)
}
Expand Down
2 changes: 1 addition & 1 deletion plugin/storage/memory/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func WithConfiguration(configuration config.Configuration) *Store {
}

// GetDependencies returns dependencies between services
func (m *Store) GetDependencies(endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
func (m *Store) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
// deduper used below can modify the spans, so we take an exclusive lock
m.Lock()
defer m.Unlock()
Expand Down
6 changes: 3 additions & 3 deletions plugin/storage/memory/memory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func withMemoryStore(f func(store *Store)) {

func TestStoreGetEmptyDependencies(t *testing.T) {
withMemoryStore(func(store *Store) {
links, err := store.GetDependencies(time.Now(), time.Hour)
links, err := store.GetDependencies(context.Background(), time.Now(), time.Hour)
assert.NoError(t, err)
assert.Empty(t, links)
})
Expand All @@ -151,11 +151,11 @@ func TestStoreGetDependencies(t *testing.T) {
assert.NoError(t, store.WriteSpan(childSpan1))
assert.NoError(t, store.WriteSpan(childSpan2))
assert.NoError(t, store.WriteSpan(childSpan2_1))
links, err := store.GetDependencies(time.Now(), time.Hour)
links, err := store.GetDependencies(context.Background(), time.Now(), time.Hour)
assert.NoError(t, err)
assert.Empty(t, links)

links, err = store.GetDependencies(time.Unix(0, 0).Add(time.Hour), time.Hour)
links, err = store.GetDependencies(context.Background(), time.Unix(0, 0).Add(time.Hour), time.Hour)
assert.NoError(t, err)
assert.Equal(t, []model.DependencyLink{{
Parent: "serviceName",
Expand Down
3 changes: 2 additions & 1 deletion storage/dependencystore/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package dependencystore

import (
"context"
"time"

"github.com/jaegertracing/jaeger/model"
Expand All @@ -28,5 +29,5 @@ type Writer interface {

// Reader can load service dependencies from storage.
type Reader interface {
GetDependencies(endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error)
GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error)
}
23 changes: 14 additions & 9 deletions storage/dependencystore/mocks/Reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,32 +15,37 @@

package mocks

import dependencystore "github.com/jaegertracing/jaeger/storage/dependencystore"
import mock "github.com/stretchr/testify/mock"
import model "github.com/jaegertracing/jaeger/model"
import time "time"
import (
"context"
"time"

"github.com/stretchr/testify/mock"

"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/storage/dependencystore"
)

// Reader is an autogenerated mock type for the Reader type
type Reader struct {
mock.Mock
}

// GetDependencies provides a mock function with given fields: endTs, lookback
func (_m *Reader) GetDependencies(endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
func (_m *Reader) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
ret := _m.Called(endTs, lookback)

var r0 []model.DependencyLink
if rf, ok := ret.Get(0).(func(time.Time, time.Duration) []model.DependencyLink); ok {
r0 = rf(endTs, lookback)
if rf, ok := ret.Get(0).(func(context.Context, time.Time, time.Duration) []model.DependencyLink); ok {
r0 = rf(ctx, endTs, lookback)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]model.DependencyLink)
}
}

var r1 error
if rf, ok := ret.Get(1).(func(time.Time, time.Duration) error); ok {
r1 = rf(endTs, lookback)
if rf, ok := ret.Get(1).(func(context.Context, time.Time, time.Duration) error); ok {
r1 = rf(ctx, endTs, lookback)
} else {
r1 = ret.Error(1)
}
Expand Down

0 comments on commit 5fb5e8a

Please sign in to comment.