diff --git a/Makefile b/Makefile index 46ed2d13387..e93d4dc3a15 100644 --- a/Makefile +++ b/Makefile @@ -177,6 +177,7 @@ lint: github.com/cortexproject/cortex/pkg/frontend/v2" \ ./pkg/querier/... faillint -paths "github.com/cortexproject/cortex/pkg/querier/..." ./pkg/scheduler/... + faillint -paths "github.com/cortexproject/cortex/pkg/storage/tsdb/..." ./pkg/storage/bucket/... # Validate Kubernetes spec files. Requires: # https://kubeval.instrumenta.dev diff --git a/integration/e2ecortex/storage.go b/integration/e2ecortex/storage.go index db3af46cb6f..2201a9dac75 100644 --- a/integration/e2ecortex/storage.go +++ b/integration/e2ecortex/storage.go @@ -11,7 +11,7 @@ import ( "github.com/cortexproject/cortex/integration/e2e" e2edb "github.com/cortexproject/cortex/integration/e2e/db" - "github.com/cortexproject/cortex/pkg/storage/backend/s3" + "github.com/cortexproject/cortex/pkg/storage/bucket/s3" "github.com/cortexproject/cortex/pkg/util/flagext" ) diff --git a/pkg/compactor/blocks_cleaner.go b/pkg/compactor/blocks_cleaner.go index cb816f5a3c8..e2ad91de8bc 100644 --- a/pkg/compactor/blocks_cleaner.go +++ b/pkg/compactor/blocks_cleaner.go @@ -16,6 +16,7 @@ import ( "github.com/thanos-io/thanos/pkg/compact" "github.com/thanos-io/thanos/pkg/objstore" + "github.com/cortexproject/cortex/pkg/storage/bucket" cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/concurrency" @@ -128,7 +129,7 @@ func (c *BlocksCleaner) cleanUsers(ctx context.Context) error { func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string) error { userLogger := util.WithUserID(userID, c.logger) - userBucket := cortex_tsdb.NewUserBucketClient(userID, c.bucketClient) + userBucket := bucket.NewUserBucketClient(userID, c.bucketClient) ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(userLogger, userBucket, c.cfg.DeletionDelay, c.cfg.MetaSyncConcurrency) @@ -178,7 +179,7 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string) error { return nil } -func (c *BlocksCleaner) cleanUserPartialBlocks(ctx context.Context, partials map[ulid.ULID]error, userBucket *cortex_tsdb.UserBucketClient, userLogger log.Logger) { +func (c *BlocksCleaner) cleanUserPartialBlocks(ctx context.Context, partials map[ulid.ULID]error, userBucket *bucket.UserBucketClient, userLogger log.Logger) { for blockID, blockErr := range partials { // We can safely delete only blocks which are partial because the meta.json is missing. if blockErr != block.ErrorSyncMetaNotFound { diff --git a/pkg/compactor/blocks_cleaner_test.go b/pkg/compactor/blocks_cleaner_test.go index 18b07731756..d84f18f6a9b 100644 --- a/pkg/compactor/blocks_cleaner_test.go +++ b/pkg/compactor/blocks_cleaner_test.go @@ -18,7 +18,7 @@ import ( "github.com/stretchr/testify/require" "github.com/thanos-io/thanos/pkg/block/metadata" - "github.com/cortexproject/cortex/pkg/storage/backend/filesystem" + "github.com/cortexproject/cortex/pkg/storage/bucket/filesystem" "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/cortexproject/cortex/pkg/util/services" ) diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index c79aed04bc2..091675b346a 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -23,6 +23,7 @@ import ( "github.com/thanos-io/thanos/pkg/objstore" "github.com/cortexproject/cortex/pkg/ring" + "github.com/cortexproject/cortex/pkg/storage/bucket" cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/flagext" @@ -156,7 +157,7 @@ type Compactor struct { // NewCompactor makes a new Compactor. func NewCompactor(compactorCfg Config, storageCfg cortex_tsdb.BlocksStorageConfig, logger log.Logger, registerer prometheus.Registerer) (*Compactor, error) { createDependencies := func(ctx context.Context) (objstore.Bucket, tsdb.Compactor, compact.Planner, error) { - bucketClient, err := cortex_tsdb.NewBucketClient(ctx, storageCfg.Bucket, "compactor", logger, registerer) + bucketClient, err := bucket.NewClient(ctx, storageCfg.Bucket, "compactor", logger, registerer) if err != nil { return nil, nil, nil, errors.Wrap(err, "failed to create the bucket client") } @@ -459,7 +460,7 @@ func (c *Compactor) compactUsers(ctx context.Context) error { } func (c *Compactor) compactUser(ctx context.Context, userID string) error { - bucket := cortex_tsdb.NewUserBucketClient(userID, c.bucketClient) + bucket := bucket.NewUserBucketClient(userID, c.bucketClient) reg := prometheus.NewRegistry() defer c.syncerMetrics.gatherThanosSyncerMetrics(reg) diff --git a/pkg/compactor/compactor_test.go b/pkg/compactor/compactor_test.go index e39cf5ca2a9..d4127d63e3b 100644 --- a/pkg/compactor/compactor_test.go +++ b/pkg/compactor/compactor_test.go @@ -31,6 +31,7 @@ import ( "github.com/cortexproject/cortex/pkg/ring" "github.com/cortexproject/cortex/pkg/ring/kv/consul" + "github.com/cortexproject/cortex/pkg/storage/bucket" cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/cortexproject/cortex/pkg/util/concurrency" "github.com/cortexproject/cortex/pkg/util/flagext" @@ -122,7 +123,7 @@ func TestCompactor_ShouldDoNothingOnNoUserBlocks(t *testing.T) { t.Parallel() // No user blocks stored in the bucket. - bucketClient := &cortex_tsdb.BucketClientMock{} + bucketClient := &bucket.ClientMock{} bucketClient.MockIter("", []string{}, nil) c, _, _, logs, registry, cleanup := prepare(t, prepareConfig(), bucketClient) @@ -265,7 +266,7 @@ func TestCompactor_ShouldRetryCompactionOnFailureWhileDiscoveringUsersFromBucket t.Parallel() // Fail to iterate over the bucket while discovering users. - bucketClient := &cortex_tsdb.BucketClientMock{} + bucketClient := &bucket.ClientMock{} bucketClient.MockIter("", nil, errors.New("failed to iterate the bucket")) c, _, _, logs, registry, cleanup := prepare(t, prepareConfig(), bucketClient) @@ -415,7 +416,7 @@ func TestCompactor_ShouldIterateOverUsersAndRunCompaction(t *testing.T) { t.Parallel() // Mock the bucket to contain two users, each one with one block. - bucketClient := &cortex_tsdb.BucketClientMock{} + bucketClient := &bucket.ClientMock{} bucketClient.MockIter("", []string{"user-1", "user-2"}, nil) bucketClient.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D"}, nil) bucketClient.MockIter("user-2/", []string{"user-2/01DTW0ZCPDDNV4BV83Q2SV4QAZ"}, nil) @@ -521,7 +522,7 @@ func TestCompactor_ShouldNotCompactBlocksMarkedForDeletion(t *testing.T) { cfg.DeletionDelay = 10 * time.Minute // Delete block after 10 minutes // Mock the bucket to contain two users, each one with one block. - bucketClient := &cortex_tsdb.BucketClientMock{} + bucketClient := &bucket.ClientMock{} bucketClient.MockIter("", []string{"user-1"}, nil) bucketClient.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D", "user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ"}, nil) @@ -624,7 +625,7 @@ func TestCompactor_ShouldCompactAllUsersOnShardingEnabledButOnlyOneInstanceRunni t.Parallel() // Mock the bucket to contain two users, each one with one block. - bucketClient := &cortex_tsdb.BucketClientMock{} + bucketClient := &bucket.ClientMock{} bucketClient.MockIter("", []string{"user-1", "user-2"}, nil) bucketClient.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D"}, nil) bucketClient.MockIter("user-2/", []string{"user-2/01DTW0ZCPDDNV4BV83Q2SV4QAZ"}, nil) @@ -698,7 +699,7 @@ func TestCompactor_ShouldCompactOnlyUsersOwnedByTheInstanceOnShardingEnabledAndM } // Mock the bucket to contain all users, each one with one block. - bucketClient := &cortex_tsdb.BucketClientMock{} + bucketClient := &bucket.ClientMock{} bucketClient.MockIter("", userIDs, nil) for _, userID := range userIDs { bucketClient.MockIter(userID+"/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D"}, nil) diff --git a/pkg/cortex/cortex_test.go b/pkg/cortex/cortex_test.go index d8f3a8c7829..6e8cd3e2459 100644 --- a/pkg/cortex/cortex_test.go +++ b/pkg/cortex/cortex_test.go @@ -12,7 +12,8 @@ import ( "github.com/cortexproject/cortex/pkg/ring" "github.com/cortexproject/cortex/pkg/ring/kv" "github.com/cortexproject/cortex/pkg/ruler" - "github.com/cortexproject/cortex/pkg/storage/backend/s3" + "github.com/cortexproject/cortex/pkg/storage/bucket" + "github.com/cortexproject/cortex/pkg/storage/bucket/s3" "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/cortexproject/cortex/pkg/util/services" @@ -28,8 +29,8 @@ func TestCortex(t *testing.T) { }, Ingester: ingester.Config{ BlocksStorageConfig: tsdb.BlocksStorageConfig{ - Bucket: tsdb.BucketConfig{ - Backend: tsdb.BackendS3, + Bucket: bucket.Config{ + Backend: bucket.S3, S3: s3.Config{ Endpoint: "localhost", }, @@ -46,8 +47,8 @@ func TestCortex(t *testing.T) { }, }, BlocksStorage: tsdb.BlocksStorageConfig{ - Bucket: tsdb.BucketConfig{ - Backend: tsdb.BackendS3, + Bucket: bucket.Config{ + Backend: bucket.S3, S3: s3.Config{ Endpoint: "localhost", }, diff --git a/pkg/ingester/ingester_v2.go b/pkg/ingester/ingester_v2.go index 7ff6ea229d2..48961935e93 100644 --- a/pkg/ingester/ingester_v2.go +++ b/pkg/ingester/ingester_v2.go @@ -29,6 +29,7 @@ import ( "github.com/cortexproject/cortex/pkg/ingester/client" "github.com/cortexproject/cortex/pkg/ring" + "github.com/cortexproject/cortex/pkg/storage/bucket" cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/cortexproject/cortex/pkg/tenant" "github.com/cortexproject/cortex/pkg/util" @@ -373,7 +374,7 @@ func newTSDBState(bucketClient objstore.Bucket, registerer prometheus.Registerer // NewV2 returns a new Ingester that uses Cortex block storage instead of chunks storage. func NewV2(cfg Config, clientConfig client.Config, limits *validation.Overrides, registerer prometheus.Registerer) (*Ingester, error) { - bucketClient, err := cortex_tsdb.NewBucketClient(context.Background(), cfg.BlocksStorageConfig.Bucket, "ingester", util.Logger, registerer) + bucketClient, err := bucket.NewClient(context.Background(), cfg.BlocksStorageConfig.Bucket, "ingester", util.Logger, registerer) if err != nil { return nil, errors.Wrap(err, "failed to create the bucket client") } @@ -426,7 +427,7 @@ func NewV2(cfg Config, clientConfig client.Config, limits *validation.Overrides, // Special version of ingester used by Flusher. This ingester is not ingesting anything, its only purpose is to react // on Flush method and flush all openened TSDBs when called. func NewV2ForFlusher(cfg Config, registerer prometheus.Registerer) (*Ingester, error) { - bucketClient, err := cortex_tsdb.NewBucketClient(context.Background(), cfg.BlocksStorageConfig.Bucket, "ingester", util.Logger, registerer) + bucketClient, err := bucket.NewClient(context.Background(), cfg.BlocksStorageConfig.Bucket, "ingester", util.Logger, registerer) if err != nil { return nil, errors.Wrap(err, "failed to create the bucket client") } @@ -1259,7 +1260,7 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) { userLogger, tsdbPromReg, udir, - cortex_tsdb.NewUserBucketClient(userID, i.TSDBState.bucket), + bucket.NewUserBucketClient(userID, i.TSDBState.bucket), func() labels.Labels { return l }, metadata.ReceiveSource, false, // No need to upload compacted blocks. Cortex compactor takes care of that. diff --git a/pkg/querier/blocks_scanner.go b/pkg/querier/blocks_scanner.go index c88259ed9d1..7053bb0f0a9 100644 --- a/pkg/querier/blocks_scanner.go +++ b/pkg/querier/blocks_scanner.go @@ -20,6 +20,7 @@ import ( "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/objstore" + "github.com/cortexproject/cortex/pkg/storage/bucket" cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" "github.com/cortexproject/cortex/pkg/storegateway" @@ -361,7 +362,7 @@ func (d *BlocksScanner) getOrCreateMetaFetcher(userID string) (block.MetadataFet func (d *BlocksScanner) createMetaFetcher(userID string) (block.MetadataFetcher, objstore.Bucket, *block.IgnoreDeletionMarkFilter, error) { userLogger := util.WithUserID(userID, d.logger) - userBucket := cortex_tsdb.NewUserBucketClient(userID, d.bucketClient) + userBucket := bucket.NewUserBucketClient(userID, d.bucketClient) userReg := prometheus.NewRegistry() // The following filters have been intentionally omitted: diff --git a/pkg/querier/blocks_scanner_test.go b/pkg/querier/blocks_scanner_test.go index c0bffcfae42..00f2df64893 100644 --- a/pkg/querier/blocks_scanner_test.go +++ b/pkg/querier/blocks_scanner_test.go @@ -23,8 +23,8 @@ import ( "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/objstore" - "github.com/cortexproject/cortex/pkg/storage/backend/filesystem" - cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" + "github.com/cortexproject/cortex/pkg/storage/bucket" + "github.com/cortexproject/cortex/pkg/storage/bucket/filesystem" "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" "github.com/cortexproject/cortex/pkg/util/services" ) @@ -86,7 +86,7 @@ func TestBlocksScanner_InitialScanFailure(t *testing.T) { defer os.RemoveAll(cacheDir) //nolint: errcheck ctx := context.Background() - bucket := &cortex_tsdb.BucketClientMock{} + bucket := &bucket.ClientMock{} reg := prometheus.NewPedanticRegistry() cfg := prepareBlocksScannerConfig() @@ -139,7 +139,7 @@ func TestBlocksScanner_StopWhileRunningTheInitialScanOnManyTenants(t *testing.T) tenantIDs := []string{"1", "2", "3", "4", "5", "6", "7", "8", "9", "10"} // Mock the bucket to introduce a 1s sleep while iterating each tenant in the bucket. - bucket := &cortex_tsdb.BucketClientMock{} + bucket := &bucket.ClientMock{} bucket.MockIter("", tenantIDs, nil) for _, tenantID := range tenantIDs { bucket.MockIterWithCallback(tenantID+"/", []string{}, nil, func() { @@ -177,7 +177,7 @@ func TestBlocksScanner_StopWhileRunningTheInitialScanOnManyBlocks(t *testing.T) } // Mock the bucket to introduce a 1s sleep while syncing each block in the bucket. - bucket := &cortex_tsdb.BucketClientMock{} + bucket := &bucket.ClientMock{} bucket.MockIter("", []string{"user-1"}, nil) bucket.MockIter("user-1/", blockPaths, nil) bucket.On("Exists", mock.Anything, mock.Anything).Return(false, nil).Run(func(args mock.Arguments) { diff --git a/pkg/querier/blocks_store_queryable.go b/pkg/querier/blocks_store_queryable.go index b127bf08e64..f60ec718227 100644 --- a/pkg/querier/blocks_store_queryable.go +++ b/pkg/querier/blocks_store_queryable.go @@ -30,6 +30,7 @@ import ( "github.com/cortexproject/cortex/pkg/querier/series" "github.com/cortexproject/cortex/pkg/ring" "github.com/cortexproject/cortex/pkg/ring/kv" + "github.com/cortexproject/cortex/pkg/storage/bucket" cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" "github.com/cortexproject/cortex/pkg/storegateway" @@ -153,7 +154,7 @@ func NewBlocksStoreQueryable(stores BlocksStoreSet, finder BlocksFinder, consist func NewBlocksStoreQueryableFromConfig(querierCfg Config, gatewayCfg storegateway.Config, storageCfg cortex_tsdb.BlocksStorageConfig, limits BlocksStoreLimits, logger log.Logger, reg prometheus.Registerer) (*BlocksStoreQueryable, error) { var stores BlocksStoreSet - bucketClient, err := cortex_tsdb.NewBucketClient(context.Background(), storageCfg.Bucket, "querier", logger, reg) + bucketClient, err := bucket.NewClient(context.Background(), storageCfg.Bucket, "querier", logger, reg) if err != nil { return nil, errors.Wrap(err, "failed to create bucket client") } diff --git a/pkg/storage/backend/azure/bucket_client.go b/pkg/storage/bucket/azure/bucket_client.go similarity index 100% rename from pkg/storage/backend/azure/bucket_client.go rename to pkg/storage/bucket/azure/bucket_client.go diff --git a/pkg/storage/backend/azure/config.go b/pkg/storage/bucket/azure/config.go similarity index 86% rename from pkg/storage/backend/azure/config.go rename to pkg/storage/bucket/azure/config.go index 547c70075ab..3162d5f7694 100644 --- a/pkg/storage/backend/azure/config.go +++ b/pkg/storage/bucket/azure/config.go @@ -15,12 +15,12 @@ type Config struct { MaxRetries int `yaml:"max_retries"` } -// RegisterFlags registers the flags for TSDB Azure storage +// RegisterFlags registers the flags for Azure storage func (cfg *Config) RegisterFlags(f *flag.FlagSet) { - cfg.RegisterFlagsWithPrefix("blocks-storage.", f) + cfg.RegisterFlagsWithPrefix("", f) } -// RegisterFlagsWithPrefix registers the flags for TSDB Azure storage +// RegisterFlagsWithPrefix registers the flags for Azure storage func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { f.StringVar(&cfg.StorageAccountName, prefix+"azure.account-name", "", "Azure storage account name") f.Var(&cfg.StorageAccountKey, prefix+"azure.account-key", "Azure storage account key") diff --git a/pkg/storage/bucket/client.go b/pkg/storage/bucket/client.go new file mode 100644 index 00000000000..6d6805bb10a --- /dev/null +++ b/pkg/storage/bucket/client.go @@ -0,0 +1,132 @@ +package bucket + +import ( + "context" + "errors" + "flag" + "fmt" + "strings" + + "github.com/go-kit/kit/log" + "github.com/prometheus/client_golang/prometheus" + "github.com/thanos-io/thanos/pkg/objstore" + + "github.com/cortexproject/cortex/pkg/storage/bucket/azure" + "github.com/cortexproject/cortex/pkg/storage/bucket/filesystem" + "github.com/cortexproject/cortex/pkg/storage/bucket/gcs" + "github.com/cortexproject/cortex/pkg/storage/bucket/s3" + "github.com/cortexproject/cortex/pkg/storage/bucket/swift" + "github.com/cortexproject/cortex/pkg/util" +) + +const ( + // S3 is the value for the S3 storage backend. + S3 = "s3" + + // GCS is the value for the GCS storage backend. + GCS = "gcs" + + // Azure is the value for the Azure storage backend. + Azure = "azure" + + // Swift is the value for the Openstack Swift storage backend. + Swift = "swift" + + // Filesystem is the value for the filesystem storage backend. + Filesystem = "filesystem" +) + +var ( + supportedBackends = []string{S3, GCS, Azure, Swift, Filesystem} + + ErrUnsupportedStorageBackend = errors.New("unsupported storage backend") +) + +// Config holds configuration for accessing long-term storage. +type Config struct { + Backend string `yaml:"backend"` + // Backends + S3 s3.Config `yaml:"s3"` + GCS gcs.Config `yaml:"gcs"` + Azure azure.Config `yaml:"azure"` + Swift swift.Config `yaml:"swift"` + Filesystem filesystem.Config `yaml:"filesystem"` + + // Not used internally, meant to allow callers to wrap Buckets + // created using this config + Middlewares []func(objstore.Bucket) (objstore.Bucket, error) `yaml:"-"` +} + +// RegisterFlags registers the backend storage config. +func (cfg *Config) RegisterFlags(f *flag.FlagSet) { + cfg.RegisterFlagsWithPrefix("", f) +} + +func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { + cfg.S3.RegisterFlagsWithPrefix(prefix, f) + cfg.GCS.RegisterFlagsWithPrefix(prefix, f) + cfg.Azure.RegisterFlagsWithPrefix(prefix, f) + cfg.Swift.RegisterFlagsWithPrefix(prefix, f) + cfg.Filesystem.RegisterFlagsWithPrefix(prefix, f) + + f.StringVar(&cfg.Backend, prefix+"backend", "s3", fmt.Sprintf("Backend storage to use. Supported backends are: %s.", strings.Join(supportedBackends, ", "))) +} + +func (cfg *Config) Validate() error { + if !util.StringsContain(supportedBackends, cfg.Backend) { + return ErrUnsupportedStorageBackend + } + + if cfg.Backend == S3 { + if err := cfg.S3.Validate(); err != nil { + return err + } + } + + return nil +} + +// NewClient creates a new bucket client based on the configured backend +func NewClient(ctx context.Context, cfg Config, name string, logger log.Logger, reg prometheus.Registerer) (client objstore.Bucket, err error) { + switch cfg.Backend { + case S3: + client, err = s3.NewBucketClient(cfg.S3, name, logger) + case GCS: + client, err = gcs.NewBucketClient(ctx, cfg.GCS, name, logger) + case Azure: + client, err = azure.NewBucketClient(cfg.Azure, name, logger) + case Swift: + client, err = swift.NewBucketClient(cfg.Swift, name, logger) + case Filesystem: + client, err = filesystem.NewBucketClient(cfg.Filesystem) + default: + return nil, ErrUnsupportedStorageBackend + } + + if err != nil { + return nil, err + } + + client = objstore.NewTracingBucket(bucketWithMetrics(client, name, reg)) + + // Wrap the client with any provided middleware + for _, wrap := range cfg.Middlewares { + client, err = wrap(client) + if err != nil { + return nil, err + } + } + + return client, nil +} + +func bucketWithMetrics(bucketClient objstore.Bucket, name string, reg prometheus.Registerer) objstore.Bucket { + if reg == nil { + return bucketClient + } + + return objstore.BucketWithMetrics( + "", // bucket label value + bucketClient, + prometheus.WrapRegistererWith(prometheus.Labels{"component": name}, reg)) +} diff --git a/pkg/storage/tsdb/bucket_client_mock.go b/pkg/storage/bucket/client_mock.go similarity index 67% rename from pkg/storage/tsdb/bucket_client_mock.go rename to pkg/storage/bucket/client_mock.go index e4d01481098..907e1d85f35 100644 --- a/pkg/storage/tsdb/bucket_client_mock.go +++ b/pkg/storage/bucket/client_mock.go @@ -1,4 +1,4 @@ -package tsdb +package bucket import ( "bytes" @@ -13,42 +13,42 @@ import ( var errObjectDoesNotExist = errors.New("object does not exist") -// BucketClientMock mocks objstore.Bucket -type BucketClientMock struct { +// ClientMock mocks objstore.Bucket +type ClientMock struct { mock.Mock } // Upload mocks objstore.Bucket.Upload() -func (m *BucketClientMock) Upload(ctx context.Context, name string, r io.Reader) error { +func (m *ClientMock) Upload(ctx context.Context, name string, r io.Reader) error { args := m.Called(ctx, name, r) return args.Error(0) } // Delete mocks objstore.Bucket.Delete() -func (m *BucketClientMock) Delete(ctx context.Context, name string) error { +func (m *ClientMock) Delete(ctx context.Context, name string) error { args := m.Called(ctx, name) return args.Error(0) } // Name mocks objstore.Bucket.Name() -func (m *BucketClientMock) Name() string { +func (m *ClientMock) Name() string { return "mock" } // Iter mocks objstore.Bucket.Iter() -func (m *BucketClientMock) Iter(ctx context.Context, dir string, f func(string) error) error { +func (m *ClientMock) Iter(ctx context.Context, dir string, f func(string) error) error { args := m.Called(ctx, dir, f) return args.Error(0) } // MockIter is a convenient method to mock Iter() -func (m *BucketClientMock) MockIter(prefix string, objects []string, err error) { +func (m *ClientMock) MockIter(prefix string, objects []string, err error) { m.MockIterWithCallback(prefix, objects, err, nil) } // MockIterWithCallback is a convenient method to mock Iter() and get a callback called when the Iter // API is called. -func (m *BucketClientMock) MockIterWithCallback(prefix string, objects []string, err error, cb func()) { +func (m *ClientMock) MockIterWithCallback(prefix string, objects []string, err error, cb func()) { m.On("Iter", mock.Anything, prefix, mock.Anything).Return(err).Run(func(args mock.Arguments) { if cb != nil { cb() @@ -65,7 +65,7 @@ func (m *BucketClientMock) MockIterWithCallback(prefix string, objects []string, } // Get mocks objstore.Bucket.Get() -func (m *BucketClientMock) Get(ctx context.Context, name string) (io.ReadCloser, error) { +func (m *ClientMock) Get(ctx context.Context, name string) (io.ReadCloser, error) { args := m.Called(ctx, name) val, err := args.Get(0), args.Error(1) if val == nil { @@ -75,7 +75,7 @@ func (m *BucketClientMock) Get(ctx context.Context, name string) (io.ReadCloser, } // MockGet is a convenient method to mock Get() and Exists() -func (m *BucketClientMock) MockGet(name, content string, err error) { +func (m *ClientMock) MockGet(name, content string, err error) { if content != "" { m.On("Exists", mock.Anything, name).Return(true, err) @@ -92,34 +92,34 @@ func (m *BucketClientMock) MockGet(name, content string, err error) { } } -func (m *BucketClientMock) MockDelete(name string, err error) { +func (m *ClientMock) MockDelete(name string, err error) { m.On("Delete", mock.Anything, name).Return(err) } // GetRange mocks objstore.Bucket.GetRange() -func (m *BucketClientMock) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) { +func (m *ClientMock) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) { args := m.Called(ctx, name, off, length) return args.Get(0).(io.ReadCloser), args.Error(1) } // Exists mocks objstore.Bucket.Exists() -func (m *BucketClientMock) Exists(ctx context.Context, name string) (bool, error) { +func (m *ClientMock) Exists(ctx context.Context, name string) (bool, error) { args := m.Called(ctx, name) return args.Bool(0), args.Error(1) } // IsObjNotFoundErr mocks objstore.Bucket.IsObjNotFoundErr() -func (m *BucketClientMock) IsObjNotFoundErr(err error) bool { +func (m *ClientMock) IsObjNotFoundErr(err error) bool { return err == errObjectDoesNotExist } // ObjectSize mocks objstore.Bucket.Attributes() -func (m *BucketClientMock) Attributes(ctx context.Context, name string) (objstore.ObjectAttributes, error) { +func (m *ClientMock) Attributes(ctx context.Context, name string) (objstore.ObjectAttributes, error) { args := m.Called(ctx, name) return args.Get(0).(objstore.ObjectAttributes), args.Error(1) } // Close mocks objstore.Bucket.Close() -func (m *BucketClientMock) Close() error { +func (m *ClientMock) Close() error { return nil } diff --git a/pkg/storage/tsdb/bucket_client_test.go b/pkg/storage/bucket/client_test.go similarity index 89% rename from pkg/storage/tsdb/bucket_client_test.go rename to pkg/storage/bucket/client_test.go index cd4588aef79..02d13ab9fff 100644 --- a/pkg/storage/tsdb/bucket_client_test.go +++ b/pkg/storage/bucket/client_test.go @@ -1,4 +1,4 @@ -package tsdb +package bucket import ( "context" @@ -47,7 +47,7 @@ backend: unknown ` ) -func TestNewBucketClient(t *testing.T) { +func TestNewClient(t *testing.T) { t.Parallel() tests := map[string]struct { @@ -64,7 +64,7 @@ func TestNewBucketClient(t *testing.T) { }, "should return error on unknown backend": { config: configWithUnknownBackend, - expectedErr: errUnsupportedStorageBackend, + expectedErr: ErrUnsupportedStorageBackend, }, } @@ -73,14 +73,14 @@ func TestNewBucketClient(t *testing.T) { t.Run(testName, func(t *testing.T) { // Load config - cfg := BlocksStorageConfig{} + cfg := Config{} flagext.DefaultValues(&cfg) err := yaml.Unmarshal([]byte(testData.config), &cfg) require.NoError(t, err) // Instance a new bucket client from the config - bucketClient, err := NewBucketClient(context.Background(), cfg.Bucket, "test", util.Logger, nil) + bucketClient, err := NewClient(context.Background(), cfg, "test", util.Logger, nil) require.Equal(t, testData.expectedErr, err) if testData.expectedErr == nil { diff --git a/pkg/storage/backend/filesystem/bucket_client.go b/pkg/storage/bucket/filesystem/bucket_client.go similarity index 100% rename from pkg/storage/backend/filesystem/bucket_client.go rename to pkg/storage/bucket/filesystem/bucket_client.go diff --git a/pkg/storage/backend/filesystem/config.go b/pkg/storage/bucket/filesystem/config.go similarity index 65% rename from pkg/storage/backend/filesystem/config.go rename to pkg/storage/bucket/filesystem/config.go index 26a3ebd1c45..923923a0329 100644 --- a/pkg/storage/backend/filesystem/config.go +++ b/pkg/storage/bucket/filesystem/config.go @@ -7,12 +7,12 @@ type Config struct { Directory string `yaml:"dir"` } -// RegisterFlags registers the flags for TSDB filesystem storage +// RegisterFlags registers the flags for filesystem storage func (cfg *Config) RegisterFlags(f *flag.FlagSet) { - cfg.RegisterFlagsWithPrefix("blocks-storage.", f) + cfg.RegisterFlagsWithPrefix("", f) } -// RegisterFlagsWithPrefix registers the flags for TSDB filesystem storage with the provided prefix +// RegisterFlagsWithPrefix registers the flags for filesystem storage with the provided prefix func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { f.StringVar(&cfg.Directory, prefix+"filesystem.dir", "", "Local filesystem storage directory.") } diff --git a/pkg/storage/backend/gcs/bucket_client.go b/pkg/storage/bucket/gcs/bucket_client.go similarity index 100% rename from pkg/storage/backend/gcs/bucket_client.go rename to pkg/storage/bucket/gcs/bucket_client.go diff --git a/pkg/storage/backend/gcs/config.go b/pkg/storage/bucket/gcs/config.go similarity index 78% rename from pkg/storage/backend/gcs/config.go rename to pkg/storage/bucket/gcs/config.go index 44eb020b127..3e646a757c4 100644 --- a/pkg/storage/backend/gcs/config.go +++ b/pkg/storage/bucket/gcs/config.go @@ -12,12 +12,12 @@ type Config struct { ServiceAccount flagext.Secret `yaml:"service_account"` } -// RegisterFlags registers the flags for TSDB GCS storage +// RegisterFlags registers the flags for GCS storage func (cfg *Config) RegisterFlags(f *flag.FlagSet) { - cfg.RegisterFlagsWithPrefix("blocks-storage.", f) + cfg.RegisterFlagsWithPrefix("", f) } -// RegisterFlagsWithPrefix registers the flags for TSDB GCS storage with the provided prefix +// RegisterFlagsWithPrefix registers the flags for GCS storage with the provided prefix func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { f.StringVar(&cfg.BucketName, prefix+"gcs.bucket-name", "", "GCS bucket name") f.Var(&cfg.ServiceAccount, prefix+"gcs.service-account", "JSON representing either a Google Developers Console client_credentials.json file or a Google Developers service account key file. If empty, fallback to Google default logic.") diff --git a/pkg/storage/backend/s3/bucket_client.go b/pkg/storage/bucket/s3/bucket_client.go similarity index 100% rename from pkg/storage/backend/s3/bucket_client.go rename to pkg/storage/bucket/s3/bucket_client.go diff --git a/pkg/storage/backend/s3/config.go b/pkg/storage/bucket/s3/config.go similarity index 91% rename from pkg/storage/backend/s3/config.go rename to pkg/storage/bucket/s3/config.go index a89be513500..96db7e1f0c1 100644 --- a/pkg/storage/backend/s3/config.go +++ b/pkg/storage/bucket/s3/config.go @@ -32,7 +32,7 @@ type HTTPConfig struct { Transport http.RoundTripper `yaml:"-"` } -// RegisterFlagsWithPrefix registers the flags for TSDB s3 storage with the provided prefix +// RegisterFlagsWithPrefix registers the flags for s3 storage with the provided prefix func (cfg *HTTPConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { f.DurationVar(&cfg.IdleConnTimeout, prefix+"s3.http.idle-conn-timeout", 90*time.Second, "The time an idle connection will remain idle before closing.") f.DurationVar(&cfg.ResponseHeaderTimeout, prefix+"s3.http.response-header-timeout", 2*time.Minute, "The amount of time the client will wait for a servers response headers.") @@ -51,12 +51,12 @@ type Config struct { HTTP HTTPConfig `yaml:"http"` } -// RegisterFlags registers the flags for TSDB s3 storage with the provided prefix +// RegisterFlags registers the flags for s3 storage with the provided prefix func (cfg *Config) RegisterFlags(f *flag.FlagSet) { - cfg.RegisterFlagsWithPrefix("blocks-storage.", f) + cfg.RegisterFlagsWithPrefix("", f) } -// RegisterFlagsWithPrefix registers the flags for TSDB s3 storage with the provided prefix +// RegisterFlagsWithPrefix registers the flags for s3 storage with the provided prefix func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { f.StringVar(&cfg.AccessKeyID, prefix+"s3.access-key-id", "", "S3 access key ID") f.Var(&cfg.SecretAccessKey, prefix+"s3.secret-access-key", "S3 secret access key") diff --git a/pkg/storage/backend/swift/bucket_client.go b/pkg/storage/bucket/swift/bucket_client.go similarity index 100% rename from pkg/storage/backend/swift/bucket_client.go rename to pkg/storage/bucket/swift/bucket_client.go diff --git a/pkg/storage/backend/swift/config.go b/pkg/storage/bucket/swift/config.go similarity index 92% rename from pkg/storage/backend/swift/config.go rename to pkg/storage/bucket/swift/config.go index 13347fb9d98..3bc682af7ed 100644 --- a/pkg/storage/backend/swift/config.go +++ b/pkg/storage/bucket/swift/config.go @@ -22,12 +22,12 @@ type Config struct { ContainerName string `yaml:"container_name"` } -// RegisterFlags registers the flags for TSDB Swift storage +// RegisterFlags registers the flags for Swift storage func (cfg *Config) RegisterFlags(f *flag.FlagSet) { - cfg.RegisterFlagsWithPrefix("blocks-storage.", f) + cfg.RegisterFlagsWithPrefix("", f) } -// RegisterFlagsWithPrefix registers the flags for TSDB Swift storage with the provided prefix +// RegisterFlagsWithPrefix registers the flags for Swift storage with the provided prefix func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { f.StringVar(&cfg.AuthURL, prefix+"swift.auth-url", "", "OpenStack Swift authentication URL") f.StringVar(&cfg.Username, prefix+"swift.username", "", "OpenStack Swift username.") diff --git a/pkg/storage/tsdb/user_bucket_client.go b/pkg/storage/bucket/user_bucket_client.go similarity index 99% rename from pkg/storage/tsdb/user_bucket_client.go rename to pkg/storage/bucket/user_bucket_client.go index a06877f6253..27d5b95e76a 100644 --- a/pkg/storage/tsdb/user_bucket_client.go +++ b/pkg/storage/bucket/user_bucket_client.go @@ -1,4 +1,4 @@ -package tsdb +package bucket import ( "context" diff --git a/pkg/storage/tsdb/bucket_client.go b/pkg/storage/tsdb/bucket_client.go deleted file mode 100644 index eeb92aca6a4..00000000000 --- a/pkg/storage/tsdb/bucket_client.go +++ /dev/null @@ -1,60 +0,0 @@ -package tsdb - -import ( - "context" - - "github.com/go-kit/kit/log" - "github.com/prometheus/client_golang/prometheus" - "github.com/thanos-io/thanos/pkg/objstore" - - "github.com/cortexproject/cortex/pkg/storage/backend/azure" - "github.com/cortexproject/cortex/pkg/storage/backend/filesystem" - "github.com/cortexproject/cortex/pkg/storage/backend/gcs" - "github.com/cortexproject/cortex/pkg/storage/backend/s3" - "github.com/cortexproject/cortex/pkg/storage/backend/swift" -) - -// NewBucketClient creates a new bucket client based on the configured backend -func NewBucketClient(ctx context.Context, cfg BucketConfig, name string, logger log.Logger, reg prometheus.Registerer) (client objstore.Bucket, err error) { - switch cfg.Backend { - case BackendS3: - client, err = s3.NewBucketClient(cfg.S3, name, logger) - case BackendGCS: - client, err = gcs.NewBucketClient(ctx, cfg.GCS, name, logger) - case BackendAzure: - client, err = azure.NewBucketClient(cfg.Azure, name, logger) - case BackendSwift: - client, err = swift.NewBucketClient(cfg.Swift, name, logger) - case BackendFilesystem: - client, err = filesystem.NewBucketClient(cfg.Filesystem) - default: - return nil, errUnsupportedStorageBackend - } - - if err != nil { - return nil, err - } - - client = objstore.NewTracingBucket(bucketWithMetrics(client, name, reg)) - - // Wrap the client with any provided middleware - for _, wrap := range cfg.Middlewares { - client, err = wrap(client) - if err != nil { - return nil, err - } - } - - return client, nil -} - -func bucketWithMetrics(bucketClient objstore.Bucket, name string, reg prometheus.Registerer) objstore.Bucket { - if reg == nil { - return bucketClient - } - - return objstore.BucketWithMetrics( - "", // bucket label value - bucketClient, - prometheus.WrapRegistererWith(prometheus.Labels{"component": name}, reg)) -} diff --git a/pkg/storage/tsdb/config.go b/pkg/storage/tsdb/config.go index 1d8c973e024..6e193937574 100644 --- a/pkg/storage/tsdb/config.go +++ b/pkg/storage/tsdb/config.go @@ -2,7 +2,6 @@ package tsdb import ( "flag" - "fmt" "path/filepath" "strings" "time" @@ -11,33 +10,12 @@ import ( "github.com/pkg/errors" "github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/wal" - "github.com/thanos-io/thanos/pkg/objstore" "github.com/thanos-io/thanos/pkg/store" - "github.com/cortexproject/cortex/pkg/storage/backend/azure" - "github.com/cortexproject/cortex/pkg/storage/backend/filesystem" - "github.com/cortexproject/cortex/pkg/storage/backend/gcs" - "github.com/cortexproject/cortex/pkg/storage/backend/s3" - "github.com/cortexproject/cortex/pkg/storage/backend/swift" - "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/storage/bucket" ) const ( - // BackendS3 is the value for the S3 storage backend - BackendS3 = "s3" - - // BackendGCS is the value for the GCS storage backend - BackendGCS = "gcs" - - // BackendAzure is the value for the Azure storage backend - BackendAzure = "azure" - - // BackendSwift is the value for the Openstack Swift storage backend - BackendSwift = "swift" - - // BackendFilesystem is the value for the filesystem storge backend - BackendFilesystem = "filesystem" - // TenantIDExternalLabel is the external label containing the tenant ID, // set when shipping blocks to the storage. TenantIDExternalLabel = "__org_id__" @@ -56,9 +34,6 @@ const ( // Validation errors var ( - supportedBackends = []string{BackendS3, BackendGCS, BackendAzure, BackendSwift, BackendFilesystem} - - errUnsupportedStorageBackend = errors.New("unsupported TSDB storage backend") errInvalidShipConcurrency = errors.New("invalid TSDB ship concurrency") errInvalidOpeningConcurrency = errors.New("invalid TSDB opening concurrency") errInvalidCompactionInterval = errors.New("invalid TSDB compaction interval") @@ -68,25 +43,10 @@ var ( errEmptyBlockranges = errors.New("empty block ranges for TSDB") ) -// BucketConfig holds configuration for accessing long-term storage. -type BucketConfig struct { - Backend string `yaml:"backend"` - // Backends - S3 s3.Config `yaml:"s3"` - GCS gcs.Config `yaml:"gcs"` - Azure azure.Config `yaml:"azure"` - Swift swift.Config `yaml:"swift"` - Filesystem filesystem.Config `yaml:"filesystem"` - - // Not used internally, meant to allow callers to wrap Buckets - // created using this config - Middlewares []func(objstore.Bucket) (objstore.Bucket, error) `yaml:"-"` -} - // BlocksStorageConfig holds the config information for the blocks storage. //nolint:golint type BlocksStorageConfig struct { - Bucket BucketConfig `yaml:",inline"` + Bucket bucket.Config `yaml:",inline"` BucketStore BucketStoreConfig `yaml:"bucket_store" doc:"description=This configures how the store-gateway synchronizes blocks stored in the bucket."` TSDB TSDBConfig `yaml:"tsdb"` } @@ -128,38 +88,13 @@ func (d *DurationList) ToMilliseconds() []int64 { return values } -// RegisterFlags registers the TSDB Backend -func (cfg *BucketConfig) RegisterFlags(f *flag.FlagSet) { - cfg.S3.RegisterFlags(f) - cfg.GCS.RegisterFlags(f) - cfg.Azure.RegisterFlags(f) - cfg.Swift.RegisterFlags(f) - cfg.Filesystem.RegisterFlags(f) - - f.StringVar(&cfg.Backend, "blocks-storage.backend", "s3", fmt.Sprintf("Backend storage to use. Supported backends are: %s.", strings.Join(supportedBackends, ", "))) -} - // RegisterFlags registers the TSDB flags func (cfg *BlocksStorageConfig) RegisterFlags(f *flag.FlagSet) { - cfg.Bucket.RegisterFlags(f) + cfg.Bucket.RegisterFlagsWithPrefix("blocks-storage.", f) cfg.BucketStore.RegisterFlags(f) cfg.TSDB.RegisterFlags(f) } -func (cfg *BucketConfig) Validate() error { - if !util.StringsContain(supportedBackends, cfg.Backend) { - return errUnsupportedStorageBackend - } - - if cfg.Backend == BackendS3 { - if err := cfg.S3.Validate(); err != nil { - return err - } - } - - return nil -} - // Validate the config. func (cfg *BlocksStorageConfig) Validate() error { if err := cfg.Bucket.Validate(); err != nil { diff --git a/pkg/storage/tsdb/config_test.go b/pkg/storage/tsdb/config_test.go index 0986fd48940..6946f51c6e8 100644 --- a/pkg/storage/tsdb/config_test.go +++ b/pkg/storage/tsdb/config_test.go @@ -7,6 +7,7 @@ import ( "github.com/stretchr/testify/assert" + "github.com/cortexproject/cortex/pkg/storage/bucket" "github.com/cortexproject/cortex/pkg/util/flagext" ) @@ -33,7 +34,7 @@ func TestConfig_Validate(t *testing.T) { setup: func(cfg *BlocksStorageConfig) { cfg.Bucket.Backend = "unknown" }, - expectedErr: errUnsupportedStorageBackend, + expectedErr: bucket.ErrUnsupportedStorageBackend, }, "should fail on invalid ship concurrency": { setup: func(cfg *BlocksStorageConfig) { diff --git a/pkg/storage/tsdb/users_scanner_test.go b/pkg/storage/tsdb/users_scanner_test.go index b0ebeb310b7..1a8a84a9ce6 100644 --- a/pkg/storage/tsdb/users_scanner_test.go +++ b/pkg/storage/tsdb/users_scanner_test.go @@ -8,10 +8,12 @@ import ( "github.com/go-kit/kit/log" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "github.com/cortexproject/cortex/pkg/storage/bucket" ) func TestUsersScanner_ScanUsers_ShouldReturnedOwnedUsersOnly(t *testing.T) { - bucketClient := &BucketClientMock{} + bucketClient := &bucket.ClientMock{} bucketClient.MockIter("", []string{"user-1", "user-2", "user-3"}, nil) isOwned := func(userID string) (bool, error) { @@ -28,7 +30,7 @@ func TestUsersScanner_ScanUsers_ShouldReturnedOwnedUsersOnly(t *testing.T) { func TestUsersScanner_ScanUsers_ShouldReturnUsersForWhichOwnerCheckFailed(t *testing.T) { expected := []string{"user-1", "user-2"} - bucketClient := &BucketClientMock{} + bucketClient := &bucket.ClientMock{} bucketClient.MockIter("", expected, nil) isOwned := func(userID string) (bool, error) { diff --git a/pkg/storegateway/bucket_stores.go b/pkg/storegateway/bucket_stores.go index 53714819ed8..cecd68e1971 100644 --- a/pkg/storegateway/bucket_stores.go +++ b/pkg/storegateway/bucket_stores.go @@ -26,6 +26,7 @@ import ( "github.com/weaveworks/common/logging" "google.golang.org/grpc/metadata" + "github.com/cortexproject/cortex/pkg/storage/bucket" "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/spanlogger" @@ -327,7 +328,7 @@ func (u *BucketStores) getOrCreateStore(userID string) (*store.BucketStore, erro level.Info(userLogger).Log("msg", "creating user bucket store") - userBkt := tsdb.NewUserBucketClient(userID, u.bucket) + userBkt := bucket.NewUserBucketClient(userID, u.bucket) // Wrap the bucket reader to skip iterating the bucket at all if the user doesn't // belong to the store-gateway shard. We need to run the BucketStore synching anyway diff --git a/pkg/storegateway/bucket_stores_test.go b/pkg/storegateway/bucket_stores_test.go index b36fc71dc48..75c00b70d6e 100644 --- a/pkg/storegateway/bucket_stores_test.go +++ b/pkg/storegateway/bucket_stores_test.go @@ -27,7 +27,8 @@ import ( "go.uber.org/atomic" "google.golang.org/grpc/metadata" - "github.com/cortexproject/cortex/pkg/storage/backend/filesystem" + "github.com/cortexproject/cortex/pkg/storage/bucket" + "github.com/cortexproject/cortex/pkg/storage/bucket/filesystem" cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/cortexproject/cortex/pkg/util/flagext" ) @@ -210,7 +211,7 @@ func TestBucketStores_syncUsersBlocks(t *testing.T) { cfg.BucketStore.TenantSyncConcurrency = 2 defer cleanup() - bucketClient := &cortex_tsdb.BucketClientMock{} + bucketClient := &bucket.ClientMock{} bucketClient.MockIter("", allUsers, nil) stores, err := NewBucketStores(cfg, testData.shardingStrategy, bucketClient, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), nil) diff --git a/pkg/storegateway/gateway.go b/pkg/storegateway/gateway.go index fe6bff4f6c9..f94e08d43f9 100644 --- a/pkg/storegateway/gateway.go +++ b/pkg/storegateway/gateway.go @@ -19,6 +19,7 @@ import ( "github.com/cortexproject/cortex/pkg/ring" "github.com/cortexproject/cortex/pkg/ring/kv" + "github.com/cortexproject/cortex/pkg/storage/bucket" cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/cortexproject/cortex/pkg/storegateway/storegatewaypb" "github.com/cortexproject/cortex/pkg/util" @@ -353,7 +354,7 @@ func (g *StoreGateway) OnRingInstanceHeartbeat(_ *ring.BasicLifecycler, _ *ring. } func createBucketClient(cfg cortex_tsdb.BlocksStorageConfig, logger log.Logger, reg prometheus.Registerer) (objstore.Bucket, error) { - bucketClient, err := cortex_tsdb.NewBucketClient(context.Background(), cfg.Bucket, "store-gateway", logger, reg) + bucketClient, err := bucket.NewClient(context.Background(), cfg.Bucket, "store-gateway", logger, reg) if err != nil { return nil, errors.Wrap(err, "create bucket client") } diff --git a/pkg/storegateway/gateway_test.go b/pkg/storegateway/gateway_test.go index ff2b0718052..40af568b00c 100644 --- a/pkg/storegateway/gateway_test.go +++ b/pkg/storegateway/gateway_test.go @@ -30,7 +30,8 @@ import ( "github.com/cortexproject/cortex/pkg/ring" "github.com/cortexproject/cortex/pkg/ring/kv/consul" - "github.com/cortexproject/cortex/pkg/storage/backend/filesystem" + "github.com/cortexproject/cortex/pkg/storage/bucket" + "github.com/cortexproject/cortex/pkg/storage/bucket/filesystem" cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/flagext" @@ -123,7 +124,7 @@ func TestStoreGateway_InitialSyncWithDefaultShardingEnabled(t *testing.T) { storageCfg, cleanup := mockStorageConfig(t) defer cleanup() ringStore := consul.NewInMemoryClient(ring.GetCodec()) - bucketClient := &cortex_tsdb.BucketClientMock{} + bucketClient := &bucket.ClientMock{} // Setup the initial instance state in the ring. if testData.initialExists { @@ -171,7 +172,7 @@ func TestStoreGateway_InitialSyncWithShardingDisabled(t *testing.T) { gatewayCfg.ShardingEnabled = false storageCfg, cleanup := mockStorageConfig(t) defer cleanup() - bucketClient := &cortex_tsdb.BucketClientMock{} + bucketClient := &bucket.ClientMock{} g, err := newStoreGateway(gatewayCfg, storageCfg, bucketClient, nil, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), nil) require.NoError(t, err) @@ -194,7 +195,7 @@ func TestStoreGateway_InitialSyncFailure(t *testing.T) { storageCfg, cleanup := mockStorageConfig(t) defer cleanup() ringStore := consul.NewInMemoryClient(ring.GetCodec()) - bucketClient := &cortex_tsdb.BucketClientMock{} + bucketClient := &bucket.ClientMock{} g, err := newStoreGateway(gatewayCfg, storageCfg, bucketClient, ringStore, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), nil) require.NoError(t, err) @@ -407,7 +408,7 @@ func TestStoreGateway_ShouldSupportLoadRingTokensFromFile(t *testing.T) { storageCfg, cleanup := mockStorageConfig(t) defer cleanup() ringStore := consul.NewInMemoryClient(ring.GetCodec()) - bucketClient := &cortex_tsdb.BucketClientMock{} + bucketClient := &bucket.ClientMock{} bucketClient.MockIter("", []string{}, nil) g, err := newStoreGateway(gatewayCfg, storageCfg, bucketClient, ringStore, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), nil) @@ -533,7 +534,7 @@ func TestStoreGateway_SyncOnRingTopologyChanged(t *testing.T) { reg := prometheus.NewPedanticRegistry() ringStore := consul.NewInMemoryClient(ring.GetCodec()) - bucketClient := &cortex_tsdb.BucketClientMock{} + bucketClient := &bucket.ClientMock{} bucketClient.MockIter("", []string{}, nil) g, err := newStoreGateway(gatewayCfg, storageCfg, bucketClient, ringStore, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) @@ -593,7 +594,7 @@ func TestStoreGateway_RingLifecyclerShouldAutoForgetUnhealthyInstances(t *testin defer cleanup() ringStore := consul.NewInMemoryClient(ring.GetCodec()) - bucketClient := &cortex_tsdb.BucketClientMock{} + bucketClient := &bucket.ClientMock{} bucketClient.MockIter("", []string{}, nil) g, err := newStoreGateway(gatewayCfg, storageCfg, bucketClient, ringStore, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), nil) diff --git a/tools/blocksconvert/builder/builder.go b/tools/blocksconvert/builder/builder.go index 05ffa4c419b..3caa588f44a 100644 --- a/tools/blocksconvert/builder/builder.go +++ b/tools/blocksconvert/builder/builder.go @@ -23,6 +23,7 @@ import ( "github.com/cortexproject/cortex/pkg/chunk" "github.com/cortexproject/cortex/pkg/chunk/cache" "github.com/cortexproject/cortex/pkg/chunk/storage" + "github.com/cortexproject/cortex/pkg/storage/bucket" cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/services" @@ -229,7 +230,7 @@ func (p *builderProcessor) ProcessPlanEntries(ctx context.Context, planEntryCh c p.builder.blocksSize.Add(float64(blockSize)) if p.builder.cfg.UploadBlock { - userBucket := cortex_tsdb.NewUserBucketClient(p.userID, p.builder.bucketClient) + userBucket := bucket.NewUserBucketClient(p.userID, p.builder.bucketClient) err := uploadBlock(ctx, p.log, userBucket, blockDir) if err != nil { @@ -249,7 +250,7 @@ func (p *builderProcessor) ProcessPlanEntries(ctx context.Context, planEntryCh c return ulid.String(), nil } -func uploadBlock(ctx context.Context, planLog log.Logger, userBucket *cortex_tsdb.UserBucketClient, blockDir string) error { +func uploadBlock(ctx context.Context, planLog log.Logger, userBucket *bucket.UserBucketClient, blockDir string) error { boff := util.NewBackoff(ctx, util.BackoffConfig{ MinBackoff: 1 * time.Second, MaxBackoff: 5 * time.Second, diff --git a/tools/blocksconvert/shared_config.go b/tools/blocksconvert/shared_config.go index 3e67ac614fc..02fed9c2e98 100644 --- a/tools/blocksconvert/shared_config.go +++ b/tools/blocksconvert/shared_config.go @@ -11,20 +11,20 @@ import ( "github.com/cortexproject/cortex/pkg/chunk" "github.com/cortexproject/cortex/pkg/chunk/storage" - "github.com/cortexproject/cortex/pkg/storage/tsdb" + "github.com/cortexproject/cortex/pkg/storage/bucket" ) type SharedConfig struct { SchemaConfig chunk.SchemaConfig // Flags registered by main.go StorageConfig storage.Config - Bucket tsdb.BucketConfig + Bucket bucket.Config BucketPrefix string } func (cfg *SharedConfig) RegisterFlags(f *flag.FlagSet) { cfg.SchemaConfig.RegisterFlags(f) - cfg.Bucket.RegisterFlags(f) + cfg.Bucket.RegisterFlagsWithPrefix("blocks-storage.", f) cfg.StorageConfig.RegisterFlags(f) f.StringVar(&cfg.BucketPrefix, "blocksconvert.bucket-prefix", "migration", "Prefix in the bucket for storing plan files.") @@ -35,7 +35,7 @@ func (cfg *SharedConfig) GetBucket(l log.Logger, reg prometheus.Registerer) (obj return nil, errors.Wrap(err, "invalid bucket config") } - bucket, err := tsdb.NewBucketClient(context.Background(), cfg.Bucket, "bucket", l, reg) + bucket, err := bucket.NewClient(context.Background(), cfg.Bucket, "bucket", l, reg) if err != nil { return nil, errors.Wrap(err, "failed to create bucket") }