diff --git a/pkg/storegateway/gateway.go b/pkg/storegateway/gateway.go index f609bb3b1aa..ca4f556019b 100644 --- a/pkg/storegateway/gateway.go +++ b/pkg/storegateway/gateway.go @@ -98,7 +98,7 @@ type StoreGateway struct { gatewayCfg Config storageCfg mimir_tsdb.BlocksStorageConfig logger log.Logger - stores *BucketStores + stores Stores tracker *activitytracker.ActivityTracker // Ring used for sharding blocks. @@ -206,9 +206,17 @@ func newStoreGateway(gatewayCfg Config, storageCfg mimir_tsdb.BlocksStorageConfi level.Info(logger).Log("msg", "store-gateway using disabled users", "disabled", gatewayCfg.DisabledTenants) } - g.stores, err = NewBucketStores(storageCfg, shardingStrategy, bucketClient, allowedTenants, limits, logger, prometheus.WrapRegistererWith(prometheus.Labels{"component": "store-gateway"}, reg)) - if err != nil { - return nil, errors.Wrap(err, "create bucket stores") + if gatewayCfg.ParquetEnabled { + level.Info(logger).Log("msg", "store-gateway using parquet block format") + g.stores, err = NewParquetBucketStores(logger, prometheus.WrapRegistererWith(prometheus.Labels{"component": "store-gateway"}, reg)) + if err != nil { + return nil, errors.Wrap(err, "create parquet bucket stores") + } + } else { + g.stores, err = NewBucketStores(storageCfg, shardingStrategy, bucketClient, allowedTenants, limits, logger, prometheus.WrapRegistererWith(prometheus.Labels{"component": "store-gateway"}, reg)) + if err != nil { + return nil, errors.Wrap(err, "create bucket stores") + } } g.Service = services.NewBasicService(g.starting, g.running, g.stopping) diff --git a/pkg/storegateway/gateway_blocks_http.go b/pkg/storegateway/gateway_blocks_http.go index e41802a0d98..47b8875a46b 100644 --- a/pkg/storegateway/gateway_blocks_http.go +++ b/pkg/storegateway/gateway_blocks_http.go @@ -82,7 +82,7 @@ func (g *StoreGateway) BlocksHandler(w http.ResponseWriter, req *http.Request) { } } - metasMap, deleteMarkerDetails, noCompactMarkerDetails, err := listblocks.LoadMetaFilesAndMarkers(req.Context(), g.stores.bucket, tenantID, showDeleted, time.Time{}) + metasMap, deleteMarkerDetails, noCompactMarkerDetails, err := listblocks.LoadMetaFilesAndMarkers(req.Context(), g.stores.(*BucketStores).bucket, tenantID, showDeleted, time.Time{}) if err != nil { util.WriteTextResponse(w, fmt.Sprintf("Failed to read block metadata: %s", err)) return diff --git a/pkg/storegateway/gateway_test.go b/pkg/storegateway/gateway_test.go index 7d4be6f175c..a2933de2175 100644 --- a/pkg/storegateway/gateway_test.go +++ b/pkg/storegateway/gateway_test.go @@ -176,9 +176,9 @@ func TestStoreGateway_InitialSyncWithDefaultShardingEnabled(t *testing.T) { assert.Equal(t, ringNumTokensDefault, len(g.ringLifecycler.GetTokens())) assert.Subset(t, g.ringLifecycler.GetTokens(), testData.initialTokens) - assert.NotNil(t, g.stores.getStore("user-1")) - assert.NotNil(t, g.stores.getStore("user-2")) - assert.Nil(t, g.stores.getStore("user-unknown")) + assert.NotNil(t, g.stores.(*BucketStores).getStore("user-1")) + assert.NotNil(t, g.stores.(*BucketStores).getStore("user-2")) + assert.Nil(t, g.stores.(*BucketStores).getStore("user-unknown")) }) } } @@ -770,7 +770,7 @@ func TestStoreGateway_SyncShouldKeepPreviousBlocksIfInstanceIsUnhealthyInTheRing srv := newStoreGatewayTestServer(t, g) // No sync retries to speed up tests. - g.stores.syncBackoffConfig = backoff.Config{MaxRetries: 1} + g.stores.(*BucketStores).syncBackoffConfig = backoff.Config{MaxRetries: 1} // Start the store-gateway. require.NoError(t, services.StartAndAwaitRunning(ctx, g)) diff --git a/pkg/storegateway/parquet_bucket_stores.go b/pkg/storegateway/parquet_bucket_stores.go new file mode 100644 index 00000000000..85415a5f1a9 --- /dev/null +++ b/pkg/storegateway/parquet_bucket_stores.go @@ -0,0 +1,61 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package storegateway + +import ( + "context" + + "github.com/go-kit/log" + "github.com/grafana/dskit/services" + "github.com/prometheus/client_golang/prometheus" + + "github.com/grafana/mimir/pkg/storegateway/storegatewaypb" + "github.com/grafana/mimir/pkg/storegateway/storepb" +) + +type ParquetBucketStores struct { + services.Service + + logger log.Logger + reg prometheus.Registerer +} + +// NewParquetBucketStores initializes a Parquet implementation of the Stores interface. +func NewParquetBucketStores( + logger log.Logger, + reg prometheus.Registerer, +) (*ParquetBucketStores, error) { + + stores := &ParquetBucketStores{ + logger: logger, + reg: reg, + } + stores.Service = services.NewIdleService(nil, nil) + + return stores, nil +} + +func (ss ParquetBucketStores) Series(req *storepb.SeriesRequest, srv storegatewaypb.StoreGateway_SeriesServer) error { + //TODO implement me + panic("implement me") +} + +func (ss ParquetBucketStores) LabelNames(ctx context.Context, req *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) { + //TODO implement me + panic("implement me") +} + +func (ss ParquetBucketStores) LabelValues(ctx context.Context, req *storepb.LabelValuesRequest) (*storepb.LabelValuesResponse, error) { + //TODO implement me + panic("implement me") +} + +func (ss ParquetBucketStores) SyncBlocks(ctx context.Context) error { + //TODO implement me + panic("implement me") +} + +func (ss ParquetBucketStores) scanUsers(ctx context.Context) ([]string, error) { + //TODO implement me + panic("implement me") +} diff --git a/pkg/storegateway/stores.go b/pkg/storegateway/stores.go new file mode 100644 index 00000000000..2d0ac294f31 --- /dev/null +++ b/pkg/storegateway/stores.go @@ -0,0 +1,22 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package storegateway + +import ( + "context" + + "github.com/grafana/dskit/services" + + "github.com/grafana/mimir/pkg/storegateway/storegatewaypb" + "github.com/grafana/mimir/pkg/storegateway/storepb" +) + +type Stores interface { + services.Service + Series(req *storepb.SeriesRequest, srv storegatewaypb.StoreGateway_SeriesServer) error + LabelNames(ctx context.Context, req *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) + LabelValues(ctx context.Context, req *storepb.LabelValuesRequest) (*storepb.LabelValuesResponse, error) + SyncBlocks(ctx context.Context) error + + scanUsers(context.Context) ([]string, error) +}