Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 22 additions & 3 deletions pkg/storegateway/parquet_bucket_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,7 @@ func (s *ParquetBucketStore) createLabelsAndChunksIterators(
// not support streaming results, the iterator we get from q.Select is
// already backed by a slice. So we are not losing as much as it may seem.
// We are planning to implement proper streaming.
lbls, aggrChunks, err := toLabelsAndAggChunksSlice(chunkSeriesSet, req.SkipChunks)
lbls, aggrChunks, err := toLabelsAndAggChunksSlice(chunkSeriesSet, shardSelector, req.SkipChunks)
if err != nil {
return nil, nil, errors.Wrap(err, "error converting parquet series set to labels and chunks slice")
}
Expand Down Expand Up @@ -1099,14 +1099,18 @@ func (s *ParquetBucketStore) cleanUpUnownedBlocks() error {
// storage.ChunkSeriesSet and returns them as slices, converting the chunks to
// storepb.AggrChunk format. If skipChunks is true, the chunks slice will be
// empty.
func toLabelsAndAggChunksSlice(chunkSeriesSet storage.ChunkSeriesSet, skipChunks bool) ([]labels.Labels, [][]storepb.AggrChunk, error) {
func toLabelsAndAggChunksSlice(chunkSeriesSet storage.ChunkSeriesSet, shardSelector *sharding.ShardSelector, skipChunks bool) ([]labels.Labels, [][]storepb.AggrChunk, error) {
var seriesLabels []labels.Labels
var aggrChunks [][]storepb.AggrChunk

for chunkSeriesSet.Next() {
chunkSeries := chunkSeriesSet.At()
lbls := chunkSeries.Labels()
if !shardOwnedUncached(shardSelector, lbls) {
continue
}

seriesLabels = append(seriesLabels, chunkSeries.Labels())
seriesLabels = append(seriesLabels, lbls)

if skipChunks {
continue
Expand All @@ -1131,6 +1135,21 @@ func toLabelsAndAggChunksSlice(chunkSeriesSet storage.ChunkSeriesSet, skipChunks

return seriesLabels, aggrChunks, chunkSeriesSet.Err()
}

// shardOwnedUncached checks if the given labels belong to the shard specified
// by the shard selector. As opposed to shardOwned & friends from the
// non-Parquet path, this function does not cache hashes. This is because, at
// least yet, we don't have easy access to an identifier for the series in the
// block to use as a cache key.
func shardOwnedUncached(shard *sharding.ShardSelector, lset labels.Labels) bool {
if shard == nil {
return true
}

hash := labels.StableHash(lset)
return hash%shard.ShardCount == shard.ShardIndex
}

func prometheusChunkEncodingToStorePBChunkType(enc chunkenc.Encoding) storepb.Chunk_Encoding {
switch enc {
case chunkenc.EncXOR:
Expand Down
74 changes: 72 additions & 2 deletions pkg/storegateway/parquet_bucket_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ import (
"github.com/grafana/mimir/pkg/storage/bucket"
"github.com/grafana/mimir/pkg/storage/bucket/filesystem"
"github.com/grafana/mimir/pkg/storage/indexheader"
"github.com/grafana/mimir/pkg/storage/tsdb"
"github.com/grafana/mimir/pkg/storage/sharding"
mimir_tsdb "github.com/grafana/mimir/pkg/storage/tsdb"
"github.com/grafana/mimir/pkg/storegateway/storepb"
)

Expand Down Expand Up @@ -109,11 +110,80 @@ func TestParquetBucketStore_Queries(t *testing.T) {
require.Contains(t, resp.Values, metricName)
})

t.Run("Series_NonStreaming", func(t *testing.T) {
req := &storepb.SeriesRequest{
MinTime: 0,
MaxTime: 100,
Matchers: []storepb.LabelMatcher{
{
Type: storepb.LabelMatcher_EQ,
Name: labels.MetricName,
Value: metricName,
},
},
}
seriesSet, warnings, err := grpcSeries(t, context.Background(), store, req)
require.NoError(t, err)
require.Empty(t, warnings)
require.Len(t, seriesSet, 1)
})

t.Run("Series_Streaming", func(t *testing.T) {
req := &storepb.SeriesRequest{
MinTime: 0,
MaxTime: 100,
StreamingChunksBatchSize: 10,
Matchers: []storepb.LabelMatcher{
{
Type: storepb.LabelMatcher_EQ,
Name: labels.MetricName,
Value: metricName,
},
},
}
seriesSet, warnings, err := grpcSeries(t, context.Background(), store, req)
require.NoError(t, err)
require.Empty(t, warnings)
require.Len(t, seriesSet, 1)
})

t.Run("Series_Sharding", func(t *testing.T) {
const shardCount = 3

seriesResultCount := 0
for shardID := range shardCount {
shardLabelValue := sharding.FormatShardIDLabelValue(uint64(shardID), shardCount)
req := &storepb.SeriesRequest{
MinTime: 0,
MaxTime: 100,
StreamingChunksBatchSize: 10,
Matchers: []storepb.LabelMatcher{
{
Type: storepb.LabelMatcher_EQ,
Name: labels.MetricName,
Value: metricName,
},
{
Type: storepb.LabelMatcher_EQ,
Name: sharding.ShardLabel,
Value: shardLabelValue,
},
},
}
seriesSet, warnings, err := grpcSeries(t, context.Background(), store, req)
require.NoError(t, err)
require.Empty(t, warnings)
require.LessOrEqual(t, len(seriesSet), 1, "Expected at most one series result per shard")
seriesResultCount += len(seriesSet)
}
require.Equal(t, seriesResultCount, 1, "Expected only one series result across all shards")
})

}

func createTestParquetBucketStore(t *testing.T, userID string, bkt objstore.Bucket) *ParquetBucketStore {
localDir := t.TempDir()
cfg := tsdb.BucketStoreConfig{
cfg := mimir_tsdb.BucketStoreConfig{
StreamingBatchSize: 1000,
BlockSyncConcurrency: 10,
IndexHeader: indexheader.Config{
Expand Down
10 changes: 7 additions & 3 deletions pkg/storegateway/parquet_bucket_stores_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/grafana/mimir/pkg/storage/bucket"
"github.com/grafana/mimir/pkg/storage/bucket/filesystem"
mimir_tsdb "github.com/grafana/mimir/pkg/storage/tsdb"
"github.com/grafana/mimir/pkg/storegateway/storegatewaypb"
"github.com/grafana/mimir/pkg/storegateway/storepb"
"github.com/grafana/mimir/pkg/util"
)
Expand Down Expand Up @@ -168,7 +169,7 @@ func generateParquetStorageBlock(t *testing.T, storageDir string, bkt objstore.B

}

func queryParquetSeries(t *testing.T, stores *ParquetBucketStores, userID, metricName string, minT, maxT int64) ([]*storepb.Series, annotations.Annotations, error) {
func queryParquetSeries(t *testing.T, store storegatewaypb.StoreGatewayServer, userID, metricName string, minT, maxT int64) ([]*storepb.Series, annotations.Annotations, error) {
req := &storepb.SeriesRequest{
MinTime: minT,
MaxTime: maxT,
Expand All @@ -182,8 +183,11 @@ func queryParquetSeries(t *testing.T, stores *ParquetBucketStores, userID, metri
Value: metricName,
})
}
return grpcSeries(t, setUserIDToGRPCContext(context.Background(), userID), store, req)
}

srv := newStoreGatewayTestServer(t, stores)
seriesSet, warnings, _, _, err := srv.Series(setUserIDToGRPCContext(context.Background(), userID), req)
func grpcSeries(t *testing.T, ctx context.Context, store storegatewaypb.StoreGatewayServer, req *storepb.SeriesRequest) ([]*storepb.Series, annotations.Annotations, error) {
srv := newStoreGatewayTestServer(t, store)
seriesSet, warnings, _, _, err := srv.Series(ctx, req)
return seriesSet, warnings, err
}
Loading