diff --git a/pkg/bloomgateway/client.go b/pkg/bloomgateway/client.go index 1eaa1bb32883..36e5b7598da9 100644 --- a/pkg/bloomgateway/client.go +++ b/pkg/bloomgateway/client.go @@ -116,11 +116,20 @@ type Client interface { FilterChunks(ctx context.Context, tenant string, interval bloomshipper.Interval, blocks []blockWithSeries, plan plan.QueryPlan) ([]*logproto.GroupedChunkRefs, error) } +// clientPool is a minimal interface that is satisfied by the JumpHashClientPool. +// It does only expose functions that are used by the GatewayClient +// and is required to mock the JumpHashClientPool in tests. +type clientPool interface { + GetClientFor(string) (ringclient.PoolClient, error) + Addr(string) (string, error) + Stop() +} + type GatewayClient struct { cfg ClientConfig logger log.Logger metrics *clientMetrics - pool *JumpHashClientPool + pool clientPool dnsProvider *discovery.DNS } @@ -211,8 +220,15 @@ func (c *GatewayClient) FilterChunks(ctx context.Context, _ string, interval blo servers := make([]addrWithGroups, 0, len(blocks)) for _, blockWithSeries := range blocks { addr, err := c.pool.Addr(blockWithSeries.block.String()) + + // the client should return the full, unfiltered list of chunks instead of an error if err != nil { - return nil, errors.Wrapf(err, "server address for block: %s", blockWithSeries.block) + level.Error(c.logger).Log("msg", "failed to resolve server address for block", "block", blockWithSeries.block, "err", err) + var series [][]*logproto.GroupedChunkRefs + for i := range blocks { + series = append(series, blocks[i].series) + } + return mergeSeries(series, nil) } if idx, found := pos[addr]; found { diff --git a/pkg/bloomgateway/client_test.go b/pkg/bloomgateway/client_test.go index 8a22edfcb978..88bb72d37b50 100644 --- a/pkg/bloomgateway/client_test.go +++ b/pkg/bloomgateway/client_test.go @@ -6,6 +6,7 @@ import ( "github.com/go-kit/log" "github.com/grafana/dskit/flagext" + "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/stretchr/testify/require" @@ -16,9 +17,16 @@ import ( "github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper" ) +type errorMockPool struct { + *JumpHashClientPool +} + +func (p *errorMockPool) Addr(_ string) (string, error) { + return "", errors.New("no server found") +} + func TestBloomGatewayClient(t *testing.T) { logger := log.NewNopLogger() - reg := prometheus.NewRegistry() limits := newLimits() @@ -26,6 +34,7 @@ func TestBloomGatewayClient(t *testing.T) { flagext.DefaultValues(&cfg) t.Run("FilterChunks returns response", func(t *testing.T) { + reg := prometheus.NewRegistry() c, err := NewClient(cfg, limits, reg, logger, nil, false) require.NoError(t, err) expr, err := syntax.ParseExpr(`{foo="bar"}`) @@ -34,6 +43,33 @@ func TestBloomGatewayClient(t *testing.T) { require.NoError(t, err) require.Equal(t, 0, len(res)) }) + + t.Run("pool error is suppressed and returns full list of chunks", func(t *testing.T) { + reg := prometheus.NewRegistry() + c, err := NewClient(cfg, limits, reg, logger, nil, false) + require.NoError(t, err) + c.pool = &errorMockPool{} + + expected := []*logproto.GroupedChunkRefs{ + {Fingerprint: 0x00, Refs: []*logproto.ShortRef{shortRef(0, 1, 1)}}, + {Fingerprint: 0x9f, Refs: []*logproto.ShortRef{shortRef(0, 1, 2)}}, + {Fingerprint: 0xa0, Refs: []*logproto.ShortRef{shortRef(0, 1, 3)}}, + {Fingerprint: 0xff, Refs: []*logproto.ShortRef{shortRef(0, 1, 4)}}, + } + + blocks := []blockWithSeries{ + {block: mkBlockRef(0x00, 0x9f), series: expected[0:2]}, + {block: mkBlockRef(0xa0, 0xff), series: expected[2:4]}, + } + expr, err := syntax.ParseExpr(`{foo="bar"}`) + require.NoError(t, err) + + res, err := c.FilterChunks(context.Background(), "tenant", bloomshipper.NewInterval(0, 0), blocks, plan.QueryPlan{AST: expr}) + require.NoError(t, err) + require.Equal(t, 4, len(res)) + + require.Equal(t, expected, res) + }) } func shortRef(f, t model.Time, c uint32) *logproto.ShortRef {