Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions pkg/storegateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ type StoreGateway struct {
gatewayCfg Config
storageCfg mimir_tsdb.BlocksStorageConfig
logger log.Logger
stores *BucketStores
stores Stores
tracker *activitytracker.ActivityTracker

// Ring used for sharding blocks.
Expand Down Expand Up @@ -206,9 +206,17 @@ func newStoreGateway(gatewayCfg Config, storageCfg mimir_tsdb.BlocksStorageConfi
level.Info(logger).Log("msg", "store-gateway using disabled users", "disabled", gatewayCfg.DisabledTenants)
}

g.stores, err = NewBucketStores(storageCfg, shardingStrategy, bucketClient, allowedTenants, limits, logger, prometheus.WrapRegistererWith(prometheus.Labels{"component": "store-gateway"}, reg))
if err != nil {
return nil, errors.Wrap(err, "create bucket stores")
if gatewayCfg.ParquetEnabled {
level.Info(logger).Log("msg", "store-gateway using parquet block format")
g.stores, err = NewParquetBucketStores(logger, prometheus.WrapRegistererWith(prometheus.Labels{"component": "store-gateway"}, reg))
if err != nil {
return nil, errors.Wrap(err, "create parquet bucket stores")
}
} else {
g.stores, err = NewBucketStores(storageCfg, shardingStrategy, bucketClient, allowedTenants, limits, logger, prometheus.WrapRegistererWith(prometheus.Labels{"component": "store-gateway"}, reg))
if err != nil {
return nil, errors.Wrap(err, "create bucket stores")
}
}

g.Service = services.NewBasicService(g.starting, g.running, g.stopping)
Expand Down
2 changes: 1 addition & 1 deletion pkg/storegateway/gateway_blocks_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (g *StoreGateway) BlocksHandler(w http.ResponseWriter, req *http.Request) {
}
}

metasMap, deleteMarkerDetails, noCompactMarkerDetails, err := listblocks.LoadMetaFilesAndMarkers(req.Context(), g.stores.bucket, tenantID, showDeleted, time.Time{})
metasMap, deleteMarkerDetails, noCompactMarkerDetails, err := listblocks.LoadMetaFilesAndMarkers(req.Context(), g.stores.(*BucketStores).bucket, tenantID, showDeleted, time.Time{})
if err != nil {
util.WriteTextResponse(w, fmt.Sprintf("Failed to read block metadata: %s", err))
return
Expand Down
8 changes: 4 additions & 4 deletions pkg/storegateway/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,9 +176,9 @@ func TestStoreGateway_InitialSyncWithDefaultShardingEnabled(t *testing.T) {
assert.Equal(t, ringNumTokensDefault, len(g.ringLifecycler.GetTokens()))
assert.Subset(t, g.ringLifecycler.GetTokens(), testData.initialTokens)

assert.NotNil(t, g.stores.getStore("user-1"))
assert.NotNil(t, g.stores.getStore("user-2"))
assert.Nil(t, g.stores.getStore("user-unknown"))
assert.NotNil(t, g.stores.(*BucketStores).getStore("user-1"))
assert.NotNil(t, g.stores.(*BucketStores).getStore("user-2"))
assert.Nil(t, g.stores.(*BucketStores).getStore("user-unknown"))
})
}
}
Expand Down Expand Up @@ -770,7 +770,7 @@ func TestStoreGateway_SyncShouldKeepPreviousBlocksIfInstanceIsUnhealthyInTheRing
srv := newStoreGatewayTestServer(t, g)

// No sync retries to speed up tests.
g.stores.syncBackoffConfig = backoff.Config{MaxRetries: 1}
g.stores.(*BucketStores).syncBackoffConfig = backoff.Config{MaxRetries: 1}

// Start the store-gateway.
require.NoError(t, services.StartAndAwaitRunning(ctx, g))
Expand Down
61 changes: 61 additions & 0 deletions pkg/storegateway/parquet_bucket_stores.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// SPDX-License-Identifier: AGPL-3.0-only

package storegateway

import (
"context"

"github.com/go-kit/log"
"github.com/grafana/dskit/services"
"github.com/prometheus/client_golang/prometheus"

"github.com/grafana/mimir/pkg/storegateway/storegatewaypb"
"github.com/grafana/mimir/pkg/storegateway/storepb"
)

type ParquetBucketStores struct {
services.Service

logger log.Logger
reg prometheus.Registerer
}

// NewParquetBucketStores initializes a Parquet implementation of the Stores interface.
func NewParquetBucketStores(
logger log.Logger,
reg prometheus.Registerer,
) (*ParquetBucketStores, error) {

stores := &ParquetBucketStores{
logger: logger,
reg: reg,
}
stores.Service = services.NewIdleService(nil, nil)

return stores, nil
}

func (ss ParquetBucketStores) Series(req *storepb.SeriesRequest, srv storegatewaypb.StoreGateway_SeriesServer) error {
//TODO implement me
panic("implement me")
}

func (ss ParquetBucketStores) LabelNames(ctx context.Context, req *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) {
//TODO implement me
panic("implement me")
}

func (ss ParquetBucketStores) LabelValues(ctx context.Context, req *storepb.LabelValuesRequest) (*storepb.LabelValuesResponse, error) {
//TODO implement me
panic("implement me")
}

func (ss ParquetBucketStores) SyncBlocks(ctx context.Context) error {
//TODO implement me
panic("implement me")
}

func (ss ParquetBucketStores) scanUsers(ctx context.Context) ([]string, error) {
//TODO implement me
panic("implement me")
}
22 changes: 22 additions & 0 deletions pkg/storegateway/stores.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// SPDX-License-Identifier: AGPL-3.0-only

package storegateway

import (
"context"

"github.com/grafana/dskit/services"

"github.com/grafana/mimir/pkg/storegateway/storegatewaypb"
"github.com/grafana/mimir/pkg/storegateway/storepb"
)

type Stores interface {
services.Service
Series(req *storepb.SeriesRequest, srv storegatewaypb.StoreGateway_SeriesServer) error
LabelNames(ctx context.Context, req *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error)
LabelValues(ctx context.Context, req *storepb.LabelValuesRequest) (*storepb.LabelValuesResponse, error)
SyncBlocks(ctx context.Context) error

scanUsers(context.Context) ([]string, error)
}
Loading