diff --git a/CHANGELOG.md b/CHANGELOG.md index ebb383375c7..a70e66066f0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -44,6 +44,7 @@ instructions below to upgrade your Postgres. * Renamed `cortex_querier_sync_seconds` metric to `cortex_querier_blocks_sync_seconds` * Track `cortex_querier_blocks_sync_seconds` metric for the initial sync too * Fixed race condition +* [ENHANCEMENT] Experimental TSDB: Added dedicated flag `-experimental.tsdb.bucket-store.tenant-sync-concurrency` to configure the maximum number of concurrent tenants for which blocks are synched. #2026 * [BUGFIX] Fixed unnecessary CAS operations done by the HA tracker when the jitter is enabled. #1861 * [BUGFIX] Fixed #1904 ingesters getting stuck in a LEAVING state after coming up from an ungraceful exit. #1921 * [BUGFIX] Reduce memory usage when ingester Push() errors. #1922 diff --git a/docs/operations/blocks-storage.md b/docs/operations/blocks-storage.md index 5c3c28e0846..3548396ac22 100644 --- a/docs/operations/blocks-storage.md +++ b/docs/operations/blocks-storage.md @@ -100,13 +100,17 @@ tsdb: # TSDB blocks retention in the ingester before a block is removed. This # should be larger than the block_ranges_period and large enough to give # ingesters enough time to discover newly uploaded blocks. - # CLI flag: -experimental.tsdb.retention-period duration + # CLI flag: -experimental.tsdb.retention-period [ retention_period: | default = 6h] - # The frequency at which the ingester shipper look for unshipped TSDB blocks - # and start uploading them to the long-term storage. - # CLI flag: -experimental.tsdb.ship-interval duration - [ship_interval: | default = 30s] + # How frequently the TSDB blocks are scanned and new ones are shipped to + # the storage. 0 means shipping is disabled. + # CLI flag: -experimental.tsdb.ship-interval + [ship_interval: | default = 1m] + + # Maximum number of tenants concurrently shipping blocks to the storage. + # CLI flag: -experimental.tsdb.ship-concurrency + [ship_concurrency: | default = 10] # The bucket store configuration applies to queriers and configure how queriers # iteract with the long-term storage backend. @@ -140,10 +144,13 @@ tsdb: # CLI flag: -experimental.tsdb.bucket-store.max-concurrent [max_concurrent: | default = 20] - # Number of Go routines, per tenant, to use when syncing blocks from the - # long-term storage. + # Maximum number of concurrent tenants synching blocks. + # CLI flag: -experimental.tsdb.bucket-store.tenant-sync-concurrency + [tenant_sync_concurrency: | default = 10] + + # Maximum number of concurrent blocks synching per tenant. # CLI flag: -experimental.tsdb.bucket-store.block-sync-concurrency - [block_sync_concurrency: | default = 20s] + [block_sync_concurrency: | default = 20] # Configures the S3 storage backend. # Required only when "s3" backend has been selected. diff --git a/pkg/ingester/ingester_v2.go b/pkg/ingester/ingester_v2.go index 309a2e2ce20..dee61ca0247 100644 --- a/pkg/ingester/ingester_v2.go +++ b/pkg/ingester/ingester_v2.go @@ -22,7 +22,6 @@ import ( "github.com/prometheus/prometheus/tsdb" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/objstore" - "github.com/thanos-io/thanos/pkg/runutil" "github.com/thanos-io/thanos/pkg/shipper" "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/user" @@ -34,9 +33,23 @@ const ( errTSDBCreateIncompatibleState = "cannot create a new TSDB while the ingester is not in active state (current state: %s)" ) +// Shipper interface is used to have an easy way to mock it in tests. +type Shipper interface { + Sync(ctx context.Context) (uploaded int, err error) +} + +type userTSDB struct { + *tsdb.DB + + // Thanos shipper used to ship blocks to the storage. + shipper Shipper + shipperCtx context.Context + shipperCancel context.CancelFunc +} + // TSDBState holds data structures used by the TSDB storage engine type TSDBState struct { - dbs map[string]*tsdb.DB // tsdb sharded by userID + dbs map[string]*userTSDB // tsdb sharded by userID bucket objstore.Bucket // Keeps count of in-flight requests @@ -65,7 +78,7 @@ func NewV2(cfg Config, clientConfig client.Config, limits *validation.Overrides, quit: make(chan struct{}), wal: &noopWAL{}, TSDBState: TSDBState{ - dbs: make(map[string]*tsdb.DB), + dbs: make(map[string]*userTSDB), bucket: bucketClient, tsdbMetrics: newTSDBMetrics(registerer), }, @@ -98,6 +111,12 @@ func NewV2(cfg Config, clientConfig client.Config, limits *validation.Overrides, // Now that user states have been created, we can start the lifecycler i.lifecycler.Start() + // Run the blocks shipping in a dedicated go routine. + if i.cfg.TSDBConfig.ShipInterval > 0 { + i.done.Add(1) + go i.shipBlocksLoop() + } + return i, nil } @@ -371,14 +390,28 @@ func (i *Ingester) v2MetricsForLabelMatchers(ctx old_ctx.Context, req *client.Me return result, nil } -func (i *Ingester) getTSDB(userID string) *tsdb.DB { +func (i *Ingester) getTSDB(userID string) *userTSDB { i.userStatesMtx.RLock() defer i.userStatesMtx.RUnlock() db, _ := i.TSDBState.dbs[userID] return db } -func (i *Ingester) getOrCreateTSDB(userID string, force bool) (*tsdb.DB, error) { +// List all users for which we have a TSDB. We do it here in order +// to keep the mutex locked for the shortest time possible. +func (i *Ingester) getTSDBUsers() []string { + i.userStatesMtx.RLock() + defer i.userStatesMtx.RUnlock() + + ids := make([]string, 0, len(i.TSDBState.dbs)) + for userID := range i.TSDBState.dbs { + ids = append(ids, userID) + } + + return ids +} + +func (i *Ingester) getOrCreateTSDB(userID string, force bool) (*userTSDB, error) { db := i.getTSDB(userID) if db != nil { return db, nil @@ -418,7 +451,7 @@ func (i *Ingester) getOrCreateTSDB(userID string, force bool) (*tsdb.DB, error) } // createTSDB creates a TSDB for a given userID, and returns the created db. -func (i *Ingester) createTSDB(userID string) (*tsdb.DB, error) { +func (i *Ingester) createTSDB(userID string) (*userTSDB, error) { tsdbPromReg := prometheus.NewRegistry() udir := i.cfg.TSDBConfig.BlocksDir(userID) @@ -433,6 +466,10 @@ func (i *Ingester) createTSDB(userID string) (*tsdb.DB, error) { return nil, err } + userDB := &userTSDB{ + DB: db, + } + // Thanos shipper requires at least 1 external label to be set. For this reason, // we set the tenant ID as external label and we'll filter it out when reading // the series from the storage. @@ -445,23 +482,18 @@ func (i *Ingester) createTSDB(userID string) (*tsdb.DB, error) { // Create a new shipper for this database if i.cfg.TSDBConfig.ShipInterval > 0 { - s := shipper.New(util.Logger, tsdbPromReg, udir, cortex_tsdb.NewUserBucketClient(userID, i.TSDBState.bucket), func() labels.Labels { return l }, metadata.ReceiveSource) - i.done.Add(1) - go func() { - defer i.done.Done() - if err := runutil.Repeat(i.cfg.TSDBConfig.ShipInterval, i.quit, func() error { - if uploaded, err := s.Sync(context.Background()); err != nil { - level.Warn(util.Logger).Log("err", err, "uploaded", uploaded) - } - return nil - }); err != nil { - level.Warn(util.Logger).Log("err", err) - } - }() + userDB.shipper = shipper.New( + util.Logger, + tsdbPromReg, + udir, + cortex_tsdb.NewUserBucketClient(userID, i.TSDBState.bucket), + func() labels.Labels { return l }, metadata.ReceiveSource) + + userDB.shipperCtx, userDB.shipperCancel = context.WithCancel(context.Background()) } i.TSDBState.tsdbMetrics.setRegistryForUser(userID, tsdbPromReg) - return db, nil + return userDB, nil } func (i *Ingester) closeAllTSDB() { @@ -471,10 +503,10 @@ func (i *Ingester) closeAllTSDB() { wg.Add(len(i.TSDBState.dbs)) // Concurrently close all users TSDB - for userID, db := range i.TSDBState.dbs { + for userID, userDB := range i.TSDBState.dbs { userID := userID - go func(db *tsdb.DB) { + go func(db *userTSDB) { defer wg.Done() if err := db.Close(); err != nil { @@ -489,7 +521,7 @@ func (i *Ingester) closeAllTSDB() { i.userStatesMtx.Lock() delete(i.TSDBState.dbs, userID) i.userStatesMtx.Unlock() - }(db) + }(userDB) } // Wait until all Close() completed @@ -579,3 +611,88 @@ func (i *Ingester) numSeriesInTSDB() float64 { return float64(count) } + +func (i *Ingester) shipBlocksLoop() { + // It's important to add the shipper loop to the "done" wait group, + // because the blocks transfer should start only once it's guaranteed + // there's no shipping on-going. + defer i.done.Done() + + // Start a goroutine that will cancel all shipper contexts on ingester + // shutdown, so that if there's any shipper sync in progress it will be + // quickly canceled. + go func() { + <-i.quit + + for _, userID := range i.getTSDBUsers() { + if userDB := i.getTSDB(userID); userDB != nil && userDB.shipperCancel != nil { + userDB.shipperCancel() + } + } + }() + + shipTicker := time.NewTicker(i.cfg.TSDBConfig.ShipInterval) + defer shipTicker.Stop() + + for { + select { + case <-shipTicker.C: + i.shipBlocks() + + case <-i.quit: + return + } + } +} + +func (i *Ingester) shipBlocks() { + // Do not ship blocks if the ingester is PENDING or JOINING. It's + // particularly important for the JOINING state because there could + // be a blocks transfer in progress (from another ingester) and if we + // run the shipper in such state we could end up with race conditions. + if ingesterState := i.lifecycler.GetState(); ingesterState == ring.PENDING || ingesterState == ring.JOINING { + level.Info(util.Logger).Log("msg", "TSDB blocks shipping has been skipped because of the current ingester state", "state", ingesterState) + return + } + + // Create a pool of workers which will synchronize blocks. The pool size + // is limited in order to avoid to concurrently sync a lot of tenants in + // a large cluster. + workersChan := make(chan string) + wg := &sync.WaitGroup{} + wg.Add(i.cfg.TSDBConfig.ShipConcurrency) + + for j := 0; j < i.cfg.TSDBConfig.ShipConcurrency; j++ { + go func() { + defer wg.Done() + + for userID := range workersChan { + // Get the user's DB. If the user doesn't exist, we skip it. + userDB := i.getTSDB(userID) + if userDB == nil || userDB.shipper == nil { + continue + } + + // Skip if the shipper context has been canceled. + if userDB.shipperCtx.Err() != nil { + continue + } + + // Run the shipper's Sync() to upload unshipped blocks. + if uploaded, err := userDB.shipper.Sync(userDB.shipperCtx); err != nil { + level.Warn(util.Logger).Log("msg", "shipper failed to synchronize TSDB blocks with the storage", "user", userID, "uploaded", uploaded, "err", err) + } else { + level.Debug(util.Logger).Log("msg", "shipper successfully synchronized TSDB blocks with storage", "user", userID, "uploaded", uploaded) + } + } + }() + } + + for _, userID := range i.getTSDBUsers() { + workersChan <- userID + } + close(workersChan) + + // Wait until all workers completed. + wg.Wait() +} diff --git a/pkg/ingester/ingester_v2_test.go b/pkg/ingester/ingester_v2_test.go index 3cdf4d67126..b47d4a92130 100644 --- a/pkg/ingester/ingester_v2_test.go +++ b/pkg/ingester/ingester_v2_test.go @@ -22,6 +22,7 @@ import ( "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/tsdb" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/user" @@ -953,3 +954,51 @@ func TestIngester_v2LoadTSDBOnStartup(t *testing.T) { }) } } + +func TestIngester_shipBlocks(t *testing.T) { + cfg := defaultIngesterTestConfig() + cfg.LifecyclerConfig.JoinAfter = 0 + cfg.TSDBConfig.ShipConcurrency = 2 + + // Create ingester + i, cleanup, err := newIngesterMockWithTSDBStorage(cfg, nil) + require.NoError(t, err) + defer i.Shutdown() + defer cleanup() + + // Wait until it's ACTIVE + test.Poll(t, 10*time.Millisecond, ring.ACTIVE, func() interface{} { + return i.lifecycler.GetState() + }) + + // Create the TSDB for 3 users and then replace the shipper with the mocked one + mocks := []*shipperMock{} + for _, userID := range []string{"user-1", "user-2", "user-3"} { + userDB, err := i.getOrCreateTSDB(userID, false) + require.NoError(t, err) + require.NotNil(t, userDB) + + m := &shipperMock{} + m.On("Sync", mock.Anything).Return(0, nil) + mocks = append(mocks, m) + + userDB.shipper = m + } + + // Ship blocks and assert on the mocked shipper + i.shipBlocks() + + for _, m := range mocks { + m.AssertNumberOfCalls(t, "Sync", 1) + } +} + +type shipperMock struct { + mock.Mock +} + +// Sync mocks Shipper.Sync() +func (m *shipperMock) Sync(ctx context.Context) (uploaded int, err error) { + args := m.Called(ctx) + return args.Int(0), args.Error(1) +} diff --git a/pkg/ingester/transfer.go b/pkg/ingester/transfer.go index 7c4ceddfa3e..30174c969e1 100644 --- a/pkg/ingester/transfer.go +++ b/pkg/ingester/transfer.go @@ -16,7 +16,6 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/tsdb" "github.com/thanos-io/thanos/pkg/shipper" "github.com/cortexproject/cortex/pkg/chunk/encoding" @@ -545,11 +544,11 @@ func (i *Ingester) v2TransferOut(ctx context.Context) error { wg := &sync.WaitGroup{} wg.Add(len(i.TSDBState.dbs)) - for _, db := range i.TSDBState.dbs { - go func(db *tsdb.DB) { + for _, userDB := range i.TSDBState.dbs { + go func(db *userTSDB) { defer wg.Done() db.DisableCompactions() - }(db) + }(userDB) } i.userStatesMtx.RUnlock() @@ -634,7 +633,13 @@ func unshippedBlocks(dir string) (map[string][]string, error) { m, err := shipper.ReadMetaFile(filepath.Join(dir, userID)) if err != nil { - return nil, err + if !os.IsNotExist(err) { + return nil, err + } + + // If the meta file doesn't exit, it means the first sync for this + // user didn't occur yet, so we're going to consider all blocks unshipped. + m = &shipper.Meta{} } shipped := make(map[string]bool) diff --git a/pkg/ingester/transfer_test.go b/pkg/ingester/transfer_test.go index 35929aabd0d..5e6c6c4de7c 100644 --- a/pkg/ingester/transfer_test.go +++ b/pkg/ingester/transfer_test.go @@ -18,7 +18,7 @@ import ( "google.golang.org/grpc" ) -type userTSDB struct { +type testUserTSDB struct { userID string shipPercent int numBlocks int @@ -26,7 +26,7 @@ type userTSDB struct { unshipped []string } -func createTSDB(t *testing.T, dir string, users []*userTSDB) { +func createTSDB(t *testing.T, dir string, users []*testUserTSDB) { for _, user := range users { os.MkdirAll(filepath.Join(dir, user.userID), 0777) @@ -87,7 +87,7 @@ func TestUnshippedBlocks(t *testing.T) { One of them has all blocks shipped, One of them has no blocks shipped, */ - users := []*userTSDB{ + users := []*testUserTSDB{ { userID: "0", shipPercent: 70, @@ -157,7 +157,7 @@ func TestTransferUser(t *testing.T) { dir, err := ioutil.TempDir("", "tsdb") require.NoError(t, err) - createTSDB(t, dir, []*userTSDB{ + createTSDB(t, dir, []*testUserTSDB{ { userID: "0", shipPercent: 0, diff --git a/pkg/querier/block_store.go b/pkg/querier/block_store.go index d2efba46695..7b20574d38e 100644 --- a/pkg/querier/block_store.go +++ b/pkg/querier/block_store.go @@ -192,7 +192,7 @@ func (u *UserStore) syncUserStores(ctx context.Context, f func(context.Context, // Create a pool of workers which will synchronize blocks. The pool size // is limited in order to avoid to concurrently sync a lot of tenants in // a large cluster. - for i := 0; i < u.cfg.BucketStore.BlockSyncConcurrency; i++ { + for i := 0; i < u.cfg.BucketStore.TenantSyncConcurrency; i++ { wg.Add(1) go func() { defer wg.Done() diff --git a/pkg/querier/block_store_test.go b/pkg/querier/block_store_test.go index 87f5741789f..7c4b897a211 100644 --- a/pkg/querier/block_store_test.go +++ b/pkg/querier/block_store_test.go @@ -76,7 +76,7 @@ func TestUserStore_InitialSync(t *testing.T) { func TestUserStore_syncUserStores(t *testing.T) { cfg := tsdb.Config{} flagext.DefaultValues(&cfg) - cfg.BucketStore.BlockSyncConcurrency = 2 + cfg.BucketStore.TenantSyncConcurrency = 2 // Disable the sync interval so that there will be no initial sync. cfg.BucketStore.SyncInterval = 0 diff --git a/pkg/storage/tsdb/config.go b/pkg/storage/tsdb/config.go index 7516085d61a..6401b11eb4d 100644 --- a/pkg/storage/tsdb/config.go +++ b/pkg/storage/tsdb/config.go @@ -25,17 +25,19 @@ const ( // Validation errors var ( - errUnsupportedBackend = errors.New("unsupported TSDB storage backend") + errUnsupportedBackend = errors.New("unsupported TSDB storage backend") + errInvalidShipConcurrency = errors.New("invalid TSDB ship concurrency") ) // Config holds the config information for TSDB storage type Config struct { - Dir string `yaml:"dir"` - BlockRanges DurationList `yaml:"block_ranges_period"` - Retention time.Duration `yaml:"retention_period"` - ShipInterval time.Duration `yaml:"ship_interval"` - Backend string `yaml:"backend"` - BucketStore BucketStoreConfig `yaml:"bucket_store"` + Dir string `yaml:"dir"` + BlockRanges DurationList `yaml:"block_ranges_period"` + Retention time.Duration `yaml:"retention_period"` + ShipInterval time.Duration `yaml:"ship_interval"` + ShipConcurrency int `yaml:"ship_concurrency"` + Backend string `yaml:"backend"` + BucketStore BucketStoreConfig `yaml:"bucket_store"` // MaxTSDBOpeningConcurrencyOnStartup limits the number of concurrently opening TSDB's during startup MaxTSDBOpeningConcurrencyOnStartup int `yaml:"max_tsdb_opening_concurrency_on_startup"` @@ -95,7 +97,8 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.StringVar(&cfg.Dir, "experimental.tsdb.dir", "tsdb", "directory to place all TSDB's into") f.Var(&cfg.BlockRanges, "experimental.tsdb.block-ranges-period", "comma separated list of TSDB block ranges in time.Duration format") f.DurationVar(&cfg.Retention, "experimental.tsdb.retention-period", 6*time.Hour, "TSDB block retention") - f.DurationVar(&cfg.ShipInterval, "experimental.tsdb.ship-interval", 30*time.Second, "the frequency at which tsdb blocks are scanned for shipping. 0 means shipping is disabled.") + f.DurationVar(&cfg.ShipInterval, "experimental.tsdb.ship-interval", 1*time.Minute, "How frequently the TSDB blocks are scanned and new ones are shipped to the storage. 0 means shipping is disabled.") + f.IntVar(&cfg.ShipConcurrency, "experimental.tsdb.ship-concurrency", 10, "Maximum number of tenants concurrently shipping blocks to the storage.") f.StringVar(&cfg.Backend, "experimental.tsdb.backend", "s3", "TSDB storage backend to use") f.IntVar(&cfg.MaxTSDBOpeningConcurrencyOnStartup, "experimental.tsdb.max-tsdb-opening-concurrency-on-startup", 10, "limit the number of concurrently opening TSDB's on startup") } @@ -106,18 +109,23 @@ func (cfg *Config) Validate() error { return errUnsupportedBackend } + if cfg.ShipInterval > 0 && cfg.ShipConcurrency <= 0 { + return errInvalidShipConcurrency + } + return nil } // BucketStoreConfig holds the config information for Bucket Stores used by the querier type BucketStoreConfig struct { - SyncDir string `yaml:"sync_dir"` - SyncInterval time.Duration `yaml:"sync_interval"` - IndexCacheSizeBytes uint64 `yaml:"index_cache_size_bytes"` - MaxChunkPoolBytes uint64 `yaml:"max_chunk_pool_bytes"` - MaxSampleCount uint64 `yaml:"max_sample_count"` - MaxConcurrent int `yaml:"max_concurrent"` - BlockSyncConcurrency int `yaml:"block_sync_concurrency"` + SyncDir string `yaml:"sync_dir"` + SyncInterval time.Duration `yaml:"sync_interval"` + IndexCacheSizeBytes uint64 `yaml:"index_cache_size_bytes"` + MaxChunkPoolBytes uint64 `yaml:"max_chunk_pool_bytes"` + MaxSampleCount uint64 `yaml:"max_sample_count"` + MaxConcurrent int `yaml:"max_concurrent"` + TenantSyncConcurrency int `yaml:"tenant_sync_concurrency"` + BlockSyncConcurrency int `yaml:"block_sync_concurrency"` } // RegisterFlags registers the BucketStore flags @@ -128,7 +136,8 @@ func (cfg *BucketStoreConfig) RegisterFlags(f *flag.FlagSet) { f.Uint64Var(&cfg.MaxChunkPoolBytes, "experimental.tsdb.bucket-store.max-chunk-pool-bytes", uint64(2*units.Gibibyte), "Max size of chunk pool in bytes per tenant.") f.Uint64Var(&cfg.MaxSampleCount, "experimental.tsdb.bucket-store.max-sample-count", 0, "Max number of samples (0 is no limit) per query when loading series from storage.") f.IntVar(&cfg.MaxConcurrent, "experimental.tsdb.bucket-store.max-concurrent", 20, "Max number of concurrent queries to the storage per tenant.") - f.IntVar(&cfg.BlockSyncConcurrency, "experimental.tsdb.bucket-store.block-sync-concurrency", 20, "Number of Go routines to use when syncing blocks from object storage per tenant.") + f.IntVar(&cfg.TenantSyncConcurrency, "experimental.tsdb.bucket-store.tenant-sync-concurrency", 10, "Maximum number of concurrent tenants synching blocks.") + f.IntVar(&cfg.BlockSyncConcurrency, "experimental.tsdb.bucket-store.block-sync-concurrency", 20, "Maximum number of concurrent blocks synching per tenant.") } // BlocksDir returns the directory path where TSDB blocks and wal should be diff --git a/pkg/storage/tsdb/config_test.go b/pkg/storage/tsdb/config_test.go index 3d0edd74aa2..c9ad2af9ac4 100644 --- a/pkg/storage/tsdb/config_test.go +++ b/pkg/storage/tsdb/config_test.go @@ -27,12 +27,28 @@ func TestConfig_Validate(t *testing.T) { }, expectedErr: nil, }, - "should pass on unknown backend": { + "should fail on unknown backend": { config: Config{ Backend: "unknown", }, expectedErr: errUnsupportedBackend, }, + "should fail on invalid ship concurrency": { + config: Config{ + Backend: "s3", + ShipInterval: time.Minute, + ShipConcurrency: 0, + }, + expectedErr: errInvalidShipConcurrency, + }, + "should pass on invalid ship concurrency but shipping is disabled": { + config: Config{ + Backend: "s3", + ShipInterval: 0, + ShipConcurrency: 0, + }, + expectedErr: nil, + }, } for testName, testData := range tests {