Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ instructions below to upgrade your Postgres.
* Renamed `cortex_querier_sync_seconds` metric to `cortex_querier_blocks_sync_seconds`
* Track `cortex_querier_blocks_sync_seconds` metric for the initial sync too
* Fixed race condition
* [ENHANCEMENT] Experimental TSDB: Added dedicated flag `-experimental.tsdb.bucket-store.tenant-sync-concurrency` to configure the maximum number of concurrent tenants for which blocks are synched. #2026
* [BUGFIX] Fixed unnecessary CAS operations done by the HA tracker when the jitter is enabled. #1861
* [BUGFIX] Fixed #1904 ingesters getting stuck in a LEAVING state after coming up from an ungraceful exit. #1921
* [BUGFIX] Reduce memory usage when ingester Push() errors. #1922
Expand Down
23 changes: 15 additions & 8 deletions docs/operations/blocks-storage.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,17 @@ tsdb:
# TSDB blocks retention in the ingester before a block is removed. This
# should be larger than the block_ranges_period and large enough to give
# ingesters enough time to discover newly uploaded blocks.
# CLI flag: -experimental.tsdb.retention-period duration
# CLI flag: -experimental.tsdb.retention-period
[ retention_period: <duration> | default = 6h]

# The frequency at which the ingester shipper look for unshipped TSDB blocks
# and start uploading them to the long-term storage.
# CLI flag: -experimental.tsdb.ship-interval duration
[ship_interval: <duration> | default = 30s]
# How frequently the TSDB blocks are scanned and new ones are shipped to
# the storage. 0 means shipping is disabled.
# CLI flag: -experimental.tsdb.ship-interval
[ship_interval: <duration> | default = 1m]

# Maximum number of tenants concurrently shipping blocks to the storage.
# CLI flag: -experimental.tsdb.ship-concurrency
[ship_concurrency: <int> | default = 10]

# The bucket store configuration applies to queriers and configure how queriers
# iteract with the long-term storage backend.
Expand Down Expand Up @@ -140,10 +144,13 @@ tsdb:
# CLI flag: -experimental.tsdb.bucket-store.max-concurrent
[max_concurrent: <int> | default = 20]

# Number of Go routines, per tenant, to use when syncing blocks from the
# long-term storage.
# Maximum number of concurrent tenants synching blocks.
# CLI flag: -experimental.tsdb.bucket-store.tenant-sync-concurrency
[tenant_sync_concurrency: <int> | default = 10]

# Maximum number of concurrent blocks synching per tenant.
# CLI flag: -experimental.tsdb.bucket-store.block-sync-concurrency
[block_sync_concurrency: <int> | default = 20s]
[block_sync_concurrency: <int> | default = 20]

# Configures the S3 storage backend.
# Required only when "s3" backend has been selected.
Expand Down
163 changes: 140 additions & 23 deletions pkg/ingester/ingester_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/prometheus/prometheus/tsdb"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/shipper"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/user"
Expand All @@ -34,9 +33,23 @@ const (
errTSDBCreateIncompatibleState = "cannot create a new TSDB while the ingester is not in active state (current state: %s)"
)

// Shipper interface is used to have an easy way to mock it in tests.
type Shipper interface {
Sync(ctx context.Context) (uploaded int, err error)
}

type userTSDB struct {
*tsdb.DB

// Thanos shipper used to ship blocks to the storage.
shipper Shipper
shipperCtx context.Context
shipperCancel context.CancelFunc
}

// TSDBState holds data structures used by the TSDB storage engine
type TSDBState struct {
dbs map[string]*tsdb.DB // tsdb sharded by userID
dbs map[string]*userTSDB // tsdb sharded by userID
bucket objstore.Bucket

// Keeps count of in-flight requests
Expand Down Expand Up @@ -65,7 +78,7 @@ func NewV2(cfg Config, clientConfig client.Config, limits *validation.Overrides,
quit: make(chan struct{}),
wal: &noopWAL{},
TSDBState: TSDBState{
dbs: make(map[string]*tsdb.DB),
dbs: make(map[string]*userTSDB),
bucket: bucketClient,
tsdbMetrics: newTSDBMetrics(registerer),
},
Expand Down Expand Up @@ -98,6 +111,12 @@ func NewV2(cfg Config, clientConfig client.Config, limits *validation.Overrides,
// Now that user states have been created, we can start the lifecycler
i.lifecycler.Start()

// Run the blocks shipping in a dedicated go routine.
if i.cfg.TSDBConfig.ShipInterval > 0 {
i.done.Add(1)
go i.shipBlocksLoop()
}

return i, nil
}

Expand Down Expand Up @@ -371,14 +390,28 @@ func (i *Ingester) v2MetricsForLabelMatchers(ctx old_ctx.Context, req *client.Me
return result, nil
}

func (i *Ingester) getTSDB(userID string) *tsdb.DB {
func (i *Ingester) getTSDB(userID string) *userTSDB {
i.userStatesMtx.RLock()
defer i.userStatesMtx.RUnlock()
db, _ := i.TSDBState.dbs[userID]
return db
}

func (i *Ingester) getOrCreateTSDB(userID string, force bool) (*tsdb.DB, error) {
// List all users for which we have a TSDB. We do it here in order
// to keep the mutex locked for the shortest time possible.
func (i *Ingester) getTSDBUsers() []string {
i.userStatesMtx.RLock()
defer i.userStatesMtx.RUnlock()

ids := make([]string, 0, len(i.TSDBState.dbs))
for userID := range i.TSDBState.dbs {
ids = append(ids, userID)
}

return ids
}

func (i *Ingester) getOrCreateTSDB(userID string, force bool) (*userTSDB, error) {
db := i.getTSDB(userID)
if db != nil {
return db, nil
Expand Down Expand Up @@ -418,7 +451,7 @@ func (i *Ingester) getOrCreateTSDB(userID string, force bool) (*tsdb.DB, error)
}

// createTSDB creates a TSDB for a given userID, and returns the created db.
func (i *Ingester) createTSDB(userID string) (*tsdb.DB, error) {
func (i *Ingester) createTSDB(userID string) (*userTSDB, error) {
tsdbPromReg := prometheus.NewRegistry()

udir := i.cfg.TSDBConfig.BlocksDir(userID)
Expand All @@ -433,6 +466,10 @@ func (i *Ingester) createTSDB(userID string) (*tsdb.DB, error) {
return nil, err
}

userDB := &userTSDB{
DB: db,
}

// Thanos shipper requires at least 1 external label to be set. For this reason,
// we set the tenant ID as external label and we'll filter it out when reading
// the series from the storage.
Expand All @@ -445,23 +482,18 @@ func (i *Ingester) createTSDB(userID string) (*tsdb.DB, error) {

// Create a new shipper for this database
if i.cfg.TSDBConfig.ShipInterval > 0 {
s := shipper.New(util.Logger, tsdbPromReg, udir, cortex_tsdb.NewUserBucketClient(userID, i.TSDBState.bucket), func() labels.Labels { return l }, metadata.ReceiveSource)
i.done.Add(1)
go func() {
defer i.done.Done()
if err := runutil.Repeat(i.cfg.TSDBConfig.ShipInterval, i.quit, func() error {
if uploaded, err := s.Sync(context.Background()); err != nil {
level.Warn(util.Logger).Log("err", err, "uploaded", uploaded)
}
return nil
}); err != nil {
level.Warn(util.Logger).Log("err", err)
}
}()
userDB.shipper = shipper.New(
util.Logger,
tsdbPromReg,
udir,
cortex_tsdb.NewUserBucketClient(userID, i.TSDBState.bucket),
func() labels.Labels { return l }, metadata.ReceiveSource)

userDB.shipperCtx, userDB.shipperCancel = context.WithCancel(context.Background())
}

i.TSDBState.tsdbMetrics.setRegistryForUser(userID, tsdbPromReg)
return db, nil
return userDB, nil
}

func (i *Ingester) closeAllTSDB() {
Expand All @@ -471,10 +503,10 @@ func (i *Ingester) closeAllTSDB() {
wg.Add(len(i.TSDBState.dbs))

// Concurrently close all users TSDB
for userID, db := range i.TSDBState.dbs {
for userID, userDB := range i.TSDBState.dbs {
userID := userID

go func(db *tsdb.DB) {
go func(db *userTSDB) {
defer wg.Done()

if err := db.Close(); err != nil {
Expand All @@ -489,7 +521,7 @@ func (i *Ingester) closeAllTSDB() {
i.userStatesMtx.Lock()
delete(i.TSDBState.dbs, userID)
i.userStatesMtx.Unlock()
}(db)
}(userDB)
}

// Wait until all Close() completed
Expand Down Expand Up @@ -579,3 +611,88 @@ func (i *Ingester) numSeriesInTSDB() float64 {

return float64(count)
}

func (i *Ingester) shipBlocksLoop() {
// It's important to add the shipper loop to the "done" wait group,
// because the blocks transfer should start only once it's guaranteed
// there's no shipping on-going.
defer i.done.Done()

// Start a goroutine that will cancel all shipper contexts on ingester
// shutdown, so that if there's any shipper sync in progress it will be
// quickly canceled.
go func() {
<-i.quit

for _, userID := range i.getTSDBUsers() {
if userDB := i.getTSDB(userID); userDB != nil && userDB.shipperCancel != nil {
userDB.shipperCancel()
}
}
}()

shipTicker := time.NewTicker(i.cfg.TSDBConfig.ShipInterval)
defer shipTicker.Stop()

for {
select {
case <-shipTicker.C:
i.shipBlocks()

case <-i.quit:
return
}
}
}

func (i *Ingester) shipBlocks() {
// Do not ship blocks if the ingester is PENDING or JOINING. It's
// particularly important for the JOINING state because there could
// be a blocks transfer in progress (from another ingester) and if we
// run the shipper in such state we could end up with race conditions.
if ingesterState := i.lifecycler.GetState(); ingesterState == ring.PENDING || ingesterState == ring.JOINING {
level.Info(util.Logger).Log("msg", "TSDB blocks shipping has been skipped because of the current ingester state", "state", ingesterState)
return
}

// Create a pool of workers which will synchronize blocks. The pool size
// is limited in order to avoid to concurrently sync a lot of tenants in
// a large cluster.
workersChan := make(chan string)
wg := &sync.WaitGroup{}
wg.Add(i.cfg.TSDBConfig.ShipConcurrency)

for j := 0; j < i.cfg.TSDBConfig.ShipConcurrency; j++ {
go func() {
defer wg.Done()

for userID := range workersChan {
// Get the user's DB. If the user doesn't exist, we skip it.
userDB := i.getTSDB(userID)
if userDB == nil || userDB.shipper == nil {
continue
}

// Skip if the shipper context has been canceled.
if userDB.shipperCtx.Err() != nil {
continue
}

// Run the shipper's Sync() to upload unshipped blocks.
if uploaded, err := userDB.shipper.Sync(userDB.shipperCtx); err != nil {
level.Warn(util.Logger).Log("msg", "shipper failed to synchronize TSDB blocks with the storage", "user", userID, "uploaded", uploaded, "err", err)
} else {
level.Debug(util.Logger).Log("msg", "shipper successfully synchronized TSDB blocks with storage", "user", userID, "uploaded", uploaded)
}
}
}()
}

for _, userID := range i.getTSDBUsers() {
workersChan <- userID
}
close(workersChan)

// Wait until all workers completed.
wg.Wait()
}
49 changes: 49 additions & 0 deletions pkg/ingester/ingester_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/tsdb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/user"
Expand Down Expand Up @@ -953,3 +954,51 @@ func TestIngester_v2LoadTSDBOnStartup(t *testing.T) {
})
}
}

func TestIngester_shipBlocks(t *testing.T) {
cfg := defaultIngesterTestConfig()
cfg.LifecyclerConfig.JoinAfter = 0
cfg.TSDBConfig.ShipConcurrency = 2

// Create ingester
i, cleanup, err := newIngesterMockWithTSDBStorage(cfg, nil)
require.NoError(t, err)
defer i.Shutdown()
defer cleanup()

// Wait until it's ACTIVE
test.Poll(t, 10*time.Millisecond, ring.ACTIVE, func() interface{} {
return i.lifecycler.GetState()
})

// Create the TSDB for 3 users and then replace the shipper with the mocked one
mocks := []*shipperMock{}
for _, userID := range []string{"user-1", "user-2", "user-3"} {
userDB, err := i.getOrCreateTSDB(userID, false)
require.NoError(t, err)
require.NotNil(t, userDB)

m := &shipperMock{}
m.On("Sync", mock.Anything).Return(0, nil)
mocks = append(mocks, m)

userDB.shipper = m
}

// Ship blocks and assert on the mocked shipper
i.shipBlocks()

for _, m := range mocks {
m.AssertNumberOfCalls(t, "Sync", 1)
}
}

type shipperMock struct {
mock.Mock
}

// Sync mocks Shipper.Sync()
func (m *shipperMock) Sync(ctx context.Context) (uploaded int, err error) {
args := m.Called(ctx)
return args.Int(0), args.Error(1)
}
15 changes: 10 additions & 5 deletions pkg/ingester/transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/tsdb"
"github.com/thanos-io/thanos/pkg/shipper"

"github.com/cortexproject/cortex/pkg/chunk/encoding"
Expand Down Expand Up @@ -545,11 +544,11 @@ func (i *Ingester) v2TransferOut(ctx context.Context) error {
wg := &sync.WaitGroup{}
wg.Add(len(i.TSDBState.dbs))

for _, db := range i.TSDBState.dbs {
go func(db *tsdb.DB) {
for _, userDB := range i.TSDBState.dbs {
go func(db *userTSDB) {
defer wg.Done()
db.DisableCompactions()
}(db)
}(userDB)
}

i.userStatesMtx.RUnlock()
Expand Down Expand Up @@ -634,7 +633,13 @@ func unshippedBlocks(dir string) (map[string][]string, error) {

m, err := shipper.ReadMetaFile(filepath.Join(dir, userID))
if err != nil {
return nil, err
if !os.IsNotExist(err) {
return nil, err
}

// If the meta file doesn't exit, it means the first sync for this
// user didn't occur yet, so we're going to consider all blocks unshipped.
m = &shipper.Meta{}
}

shipped := make(map[string]bool)
Expand Down
Loading