Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
* [ENHANCEMENT] Compactor: concurrently run blocks cleaner for multiple tenants. Concurrency can be configured via `-compactor.cleanup-concurrency`. #3483
* [ENHANCEMENT] Compactor: shuffle tenants before running compaction. #3483
* [ENHANCEMENT] Compactor: wait for a stable ring at startup, when sharding is enabled. #3484
* [ENHANCEMENT] Store-gateway: added `-blocks-storage.bucket-store.index-header-lazy-loading-enabled` to enable index-header lazy loading (experimental). When enabled, index-headers will be mmap-ed only once required by a query and will be automatically released after `-blocks-storage.bucket-store.index-header-lazy-loading-idle-timeout` time of inactivity. #3498
* [BUGFIX] Blocks storage ingester: fixed some cases leading to a TSDB WAL corruption after a partial write to disk. #3423
* [BUGFIX] Blocks storage: Fix the race between ingestion and `/flush` call resulting in overlapping blocks. #3422
* [BUGFIX] Querier: fixed `-querier.max-query-into-future` which wasn't correctly enforced on range queries. #3452
Expand Down
1 change: 1 addition & 0 deletions docs/configuration/v1-guarantees.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,4 @@ Currently experimental features are:
- Metric relabeling in the distributor.
- Scalable query-frontend (when using query-scheduler)
- Querying store for series, labels APIs (`-querier.query-store-for-labels-enabled`)
- Blocks storage: lazy mmap of block indexes in the store-gateway (`-blocks-storage.bucket-store.index-header-lazy-loading-enabled`)
6 changes: 5 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ require (
github.com/hashicorp/go-sockaddr v1.0.2
github.com/hashicorp/memberlist v0.2.2
github.com/json-iterator/go v1.1.10
github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 // indirect
github.com/lib/pq v1.3.0
github.com/mitchellh/go-wordwrap v1.0.0
github.com/ncw/swift v1.0.50
Expand All @@ -52,7 +53,7 @@ require (
github.com/sony/gobreaker v0.4.1
github.com/spf13/afero v1.2.2
github.com/stretchr/testify v1.6.1
github.com/thanos-io/thanos v0.13.1-0.20201030101306-47f9a225cc52
github.com/thanos-io/thanos v0.13.1-0.20201112171553-05fbe15616c7
github.com/uber/jaeger-client-go v2.25.0+incompatible
github.com/weaveworks/common v0.0.0-20200914083218-61ffdd448099
go.etcd.io/bbolt v1.3.5-0.20200615073812-232d8fc87f50
Expand Down Expand Up @@ -82,6 +83,9 @@ replace github.com/gocql/gocql => github.com/grafana/gocql v0.0.0-20200605141915
// We can't upgrade to grpc 1.30.0 until go.etcd.io/etcd will support it.
replace google.golang.org/grpc => google.golang.org/grpc v1.29.1

// We can't upgrade until grpc upgrade is unblocked.
replace github.com/sercand/kuberesolver => github.com/sercand/kuberesolver v2.4.0+incompatible

// Using a 3rd-party branch for custom dialer - see https://github.com/bradfitz/gomemcache/pull/86
replace github.com/bradfitz/gomemcache => github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab

Expand Down
57 changes: 20 additions & 37 deletions go.sum

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions pkg/compactor/blocks_cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,8 @@ func (c *BlocksCleaner) cleanUserPartialBlocks(ctx context.Context, partials map
}

// We can safely delete only partial blocks with a deletion mark.
_, err := metadata.ReadDeletionMark(ctx, userBucket, userLogger, blockID.String())
if err == metadata.ErrorDeletionMarkNotFound {
err := metadata.ReadMarker(ctx, userLogger, userBucket, blockID.String(), &metadata.DeletionMark{})
if err == metadata.ErrorMarkerNotFound {
continue
}
if err != nil {
Expand Down
39 changes: 23 additions & 16 deletions pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,18 +99,19 @@ type Compactor struct {
// If empty, no users are disabled. If not empty, users in the map are disabled (not owned by this compactor).
disabledUsers map[string]struct{}

// Function that creates bucket client and TSDB compactor using the context.
// Function that creates bucket client, TSDB planner and compactor using the context.
// Useful for injecting mock objects from tests.
createBucketClientAndTsdbCompactor func(ctx context.Context) (objstore.Bucket, tsdb.Compactor, error)
createDependencies func(ctx context.Context) (objstore.Bucket, tsdb.Compactor, compact.Planner, error)

// Users scanner, used to discover users from the bucket.
usersScanner *UsersScanner

// Blocks cleaner is responsible to hard delete blocks marked for deletion.
blocksCleaner *BlocksCleaner

// Underlying compactor used to compact TSDB blocks.
// Underlying compactor and planner used to compact TSDB blocks.
tsdbCompactor tsdb.Compactor
tsdbPlanner compact.Planner

// Client used to run operations on the bucket storing blocks.
bucketClient objstore.Bucket
Expand All @@ -135,17 +136,22 @@ type Compactor struct {

// NewCompactor makes a new Compactor.
func NewCompactor(compactorCfg Config, storageCfg cortex_tsdb.BlocksStorageConfig, logger log.Logger, registerer prometheus.Registerer) (*Compactor, error) {
createBucketClientAndTsdbCompactor := func(ctx context.Context) (objstore.Bucket, tsdb.Compactor, error) {
createDependencies := func(ctx context.Context) (objstore.Bucket, tsdb.Compactor, compact.Planner, error) {
bucketClient, err := cortex_tsdb.NewBucketClient(ctx, storageCfg.Bucket, "compactor", logger, registerer)
if err != nil {
return nil, nil, errors.Wrap(err, "failed to create the bucket client")
return nil, nil, nil, errors.Wrap(err, "failed to create the bucket client")
}

compactor, err := tsdb.NewLeveledCompactor(ctx, registerer, logger, compactorCfg.BlockRanges.ToMilliseconds(), downsample.NewPool())
return bucketClient, compactor, err
if err != nil {
return nil, nil, nil, err
}

planner := compact.NewTSDBBasedPlanner(logger, compactorCfg.BlockRanges.ToMilliseconds())
return bucketClient, compactor, planner, nil
}

cortexCompactor, err := newCompactor(compactorCfg, storageCfg, logger, registerer, createBucketClientAndTsdbCompactor)
cortexCompactor, err := newCompactor(compactorCfg, storageCfg, logger, registerer, createDependencies)
if err != nil {
return nil, errors.Wrap(err, "failed to create Cortex blocks compactor")
}
Expand All @@ -158,16 +164,16 @@ func newCompactor(
storageCfg cortex_tsdb.BlocksStorageConfig,
logger log.Logger,
registerer prometheus.Registerer,
createBucketClientAndTsdbCompactor func(ctx context.Context) (objstore.Bucket, tsdb.Compactor, error),
createDependencies func(ctx context.Context) (objstore.Bucket, tsdb.Compactor, compact.Planner, error),
) (*Compactor, error) {
c := &Compactor{
compactorCfg: compactorCfg,
storageCfg: storageCfg,
parentLogger: logger,
logger: log.With(logger, "component", "compactor"),
registerer: registerer,
syncerMetrics: newSyncerMetrics(registerer),
createBucketClientAndTsdbCompactor: createBucketClientAndTsdbCompactor,
compactorCfg: compactorCfg,
storageCfg: storageCfg,
parentLogger: logger,
logger: log.With(logger, "component", "compactor"),
registerer: registerer,
syncerMetrics: newSyncerMetrics(registerer),
createDependencies: createDependencies,

compactionRunsStarted: promauto.With(registerer).NewCounter(prometheus.CounterOpts{
Name: "cortex_compactor_runs_started_total",
Expand Down Expand Up @@ -223,7 +229,7 @@ func (c *Compactor) starting(ctx context.Context) error {
var err error

// Create bucket client and compactor.
c.bucketClient, c.tsdbCompactor, err = c.createBucketClientAndTsdbCompactor(ctx)
c.bucketClient, c.tsdbCompactor, c.tsdbPlanner, err = c.createDependencies(ctx)
if err != nil {
return errors.Wrap(err, "failed to initialize compactor objects")
}
Expand Down Expand Up @@ -472,6 +478,7 @@ func (c *Compactor) compactUser(ctx context.Context, userID string) error {
ulogger,
syncer,
grouper,
c.tsdbPlanner,
c.tsdbCompactor,
path.Join(c.compactorCfg.DataDir, "compact"),
bucket,
Expand Down
53 changes: 32 additions & 21 deletions pkg/compactor/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/compact"
"github.com/thanos-io/thanos/pkg/objstore"
"gopkg.in/yaml.v2"

Expand Down Expand Up @@ -86,7 +87,7 @@ func TestCompactor_ShouldDoNothingOnNoUserBlocks(t *testing.T) {
bucketClient := &cortex_tsdb.BucketClientMock{}
bucketClient.MockIter("", []string{}, nil)

c, _, logs, registry, cleanup := prepare(t, prepareConfig(), bucketClient)
c, _, _, logs, registry, cleanup := prepare(t, prepareConfig(), bucketClient)
defer cleanup()
require.NoError(t, services.StartAndAwaitRunning(context.Background(), c))

Expand Down Expand Up @@ -229,7 +230,7 @@ func TestCompactor_ShouldRetryCompactionOnFailureWhileDiscoveringUsersFromBucket
bucketClient := &cortex_tsdb.BucketClientMock{}
bucketClient.MockIter("", nil, errors.New("failed to iterate the bucket"))

c, _, logs, registry, cleanup := prepare(t, prepareConfig(), bucketClient)
c, _, _, logs, registry, cleanup := prepare(t, prepareConfig(), bucketClient)
defer cleanup()
require.NoError(t, services.StartAndAwaitRunning(context.Background(), c))

Expand Down Expand Up @@ -385,14 +386,14 @@ func TestCompactor_ShouldIterateOverUsersAndRunCompaction(t *testing.T) {
bucketClient.MockGet("user-2/01DTW0ZCPDDNV4BV83Q2SV4QAZ/meta.json", mockBlockMetaJSON("01DTW0ZCPDDNV4BV83Q2SV4QAZ"), nil)
bucketClient.MockGet("user-2/01DTW0ZCPDDNV4BV83Q2SV4QAZ/deletion-mark.json", "", nil)

c, tsdbCompactor, logs, registry, cleanup := prepare(t, prepareConfig(), bucketClient)
c, _, tsdbPlanner, logs, registry, cleanup := prepare(t, prepareConfig(), bucketClient)
defer cleanup()

// Mock the compactor as if there's no compaction to do,
// Mock the planner as if there's no compaction to do,
// in order to simplify tests (all in all, we just want to
// test our logic and not TSDB compactor which we expect to
// be already tested).
tsdbCompactor.On("Plan", mock.Anything).Return([]string{}, nil)
tsdbPlanner.On("Plan", mock.Anything, mock.Anything).Return([]*metadata.Meta{}, nil)

require.NoError(t, services.StartAndAwaitRunning(context.Background(), c))

Expand All @@ -404,7 +405,7 @@ func TestCompactor_ShouldIterateOverUsersAndRunCompaction(t *testing.T) {
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), c))

// Ensure a plan has been executed for the blocks of each user.
tsdbCompactor.AssertNumberOfCalls(t, "Plan", 2)
tsdbPlanner.AssertNumberOfCalls(t, "Plan", 2)

assert.ElementsMatch(t, []string{
`level=info component=cleaner msg="started hard deletion of blocks marked for deletion"`,
Expand Down Expand Up @@ -496,14 +497,14 @@ func TestCompactor_ShouldNotCompactBlocksMarkedForDeletion(t *testing.T) {
bucketClient.MockDelete("user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ/deletion-mark.json", nil)
bucketClient.MockDelete("user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ", nil)

c, tsdbCompactor, logs, registry, cleanup := prepare(t, cfg, bucketClient)
c, _, tsdbPlanner, logs, registry, cleanup := prepare(t, cfg, bucketClient)
defer cleanup()

// Mock the compactor as if there's no compaction to do,
// Mock the planner as if there's no compaction to do,
// in order to simplify tests (all in all, we just want to
// test our logic and not TSDB compactor which we expect to
// be already tested).
tsdbCompactor.On("Plan", mock.Anything).Return([]string{}, nil)
tsdbPlanner.On("Plan", mock.Anything, mock.Anything).Return([]*metadata.Meta{}, nil)

require.NoError(t, services.StartAndAwaitRunning(context.Background(), c))

Expand All @@ -515,7 +516,7 @@ func TestCompactor_ShouldNotCompactBlocksMarkedForDeletion(t *testing.T) {
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), c))

// Only one user's block is compacted.
tsdbCompactor.AssertNumberOfCalls(t, "Plan", 1)
tsdbPlanner.AssertNumberOfCalls(t, "Plan", 1)

assert.ElementsMatch(t, []string{
`level=info component=cleaner msg="started hard deletion of blocks marked for deletion"`,
Expand Down Expand Up @@ -600,14 +601,14 @@ func TestCompactor_ShouldCompactAllUsersOnShardingEnabledButOnlyOneInstanceRunni
cfg.ShardingRing.InstanceAddr = "1.2.3.4"
cfg.ShardingRing.KVStore.Mock = consul.NewInMemoryClient(ring.GetCodec())

c, tsdbCompactor, logs, _, cleanup := prepare(t, cfg, bucketClient)
c, _, tsdbPlanner, logs, _, cleanup := prepare(t, cfg, bucketClient)
defer cleanup()

// Mock the compactor as if there's no compaction to do,
// Mock the planner as if there's no compaction to do,
// in order to simplify tests (all in all, we just want to
// test our logic and not TSDB compactor which we expect to
// be already tested).
tsdbCompactor.On("Plan", mock.Anything).Return([]string{}, nil)
tsdbPlanner.On("Plan", mock.Anything, mock.Anything).Return([]*metadata.Meta{}, nil)

require.NoError(t, services.StartAndAwaitRunning(context.Background(), c))

Expand All @@ -619,7 +620,7 @@ func TestCompactor_ShouldCompactAllUsersOnShardingEnabledButOnlyOneInstanceRunni
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), c))

// Ensure a plan has been executed for the blocks of each user.
tsdbCompactor.AssertNumberOfCalls(t, "Plan", 2)
tsdbPlanner.AssertNumberOfCalls(t, "Plan", 2)

assert.ElementsMatch(t, []string{
`level=info component=compactor msg="waiting until compactor is ACTIVE in the ring"`,
Expand Down Expand Up @@ -683,18 +684,18 @@ func TestCompactor_ShouldCompactOnlyUsersOwnedByTheInstanceOnShardingEnabledAndM
cfg.ShardingRing.WaitStabilityMaxDuration = 10 * time.Second
cfg.ShardingRing.KVStore.Mock = kvstore

c, tsdbCompactor, l, _, cleanup := prepare(t, cfg, bucketClient)
c, _, tsdbPlanner, l, _, cleanup := prepare(t, cfg, bucketClient)
defer services.StopAndAwaitTerminated(context.Background(), c) //nolint:errcheck
defer cleanup()

compactors = append(compactors, c)
logs = append(logs, l)

// Mock the compactor as if there's no compaction to do,
// Mock the planner as if there's no compaction to do,
// in order to simplify tests (all in all, we just want to
// test our logic and not TSDB compactor which we expect to
// be already tested).
tsdbCompactor.On("Plan", mock.Anything).Return([]string{}, nil)
tsdbPlanner.On("Plan", mock.Anything, mock.Anything).Return([]*metadata.Meta{}, nil)
}

// Start all compactors
Expand Down Expand Up @@ -844,7 +845,7 @@ func prepareConfig() Config {
return compactorCfg
}

func prepare(t *testing.T, compactorCfg Config, bucketClient objstore.Bucket) (*Compactor, *tsdbCompactorMock, *concurrency.SyncBuffer, prometheus.Gatherer, func()) {
func prepare(t *testing.T, compactorCfg Config, bucketClient objstore.Bucket) (*Compactor, *tsdbCompactorMock, *tsdbPlannerMock, *concurrency.SyncBuffer, prometheus.Gatherer, func()) {
storageCfg := cortex_tsdb.BlocksStorageConfig{}
flagext.DefaultValues(&storageCfg)

Expand All @@ -858,16 +859,17 @@ func prepare(t *testing.T, compactorCfg Config, bucketClient objstore.Bucket) (*
}

tsdbCompactor := &tsdbCompactorMock{}
tsdbPlanner := &tsdbPlannerMock{}
logs := &concurrency.SyncBuffer{}
logger := log.NewLogfmtLogger(logs)
registry := prometheus.NewRegistry()

c, err := newCompactor(compactorCfg, storageCfg, logger, registry, func(ctx context.Context) (objstore.Bucket, tsdb.Compactor, error) {
return bucketClient, tsdbCompactor, nil
c, err := newCompactor(compactorCfg, storageCfg, logger, registry, func(ctx context.Context) (objstore.Bucket, tsdb.Compactor, compact.Planner, error) {
return bucketClient, tsdbCompactor, tsdbPlanner, nil
})
require.NoError(t, err)

return c, tsdbCompactor, logs, registry, cleanup
return c, tsdbCompactor, tsdbPlanner, logs, registry, cleanup
}

type tsdbCompactorMock struct {
Expand All @@ -889,6 +891,15 @@ func (m *tsdbCompactorMock) Compact(dest string, dirs []string, open []*tsdb.Blo
return args.Get(0).(ulid.ULID), args.Error(1)
}

type tsdbPlannerMock struct {
mock.Mock
}

func (m *tsdbPlannerMock) Plan(ctx context.Context, metasByMinTime []*metadata.Meta) ([]*metadata.Meta, error) {
args := m.Called(ctx, metasByMinTime)
return args.Get(0).([]*metadata.Meta), args.Error(1)
}

func mockBlockMetaJSON(id string) string {
meta := tsdb.BlockMeta{
Version: 1,
Expand Down
7 changes: 7 additions & 0 deletions pkg/storage/tsdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,11 @@ type BucketStoreConfig struct {
MetadataCache MetadataCacheConfig `yaml:"metadata_cache"`
IgnoreDeletionMarksDelay time.Duration `yaml:"ignore_deletion_mark_delay"`

// Controls whether index-header lazy loading is enabled. This config option is hidden
// until experimental.
IndexHeaderLazyLoadingEnabled bool `yaml:"index_header_lazy_loading_enabled" doc:"hidden"`
IndexHeaderLazyLoadingIdleTimeout time.Duration `yaml:"index_header_lazy_loading_idle_timeout" doc:"hidden"`

// Controls what is the ratio of postings offsets store will hold in memory.
// Larger value will keep less offsets, which will increase CPU cycles needed for query touching those postings.
// It's meant for setups that want low baseline memory pressure and where less traffic is expected.
Expand All @@ -288,6 +293,8 @@ func (cfg *BucketStoreConfig) RegisterFlags(f *flag.FlagSet) {
"The idea of ignore-deletion-marks-delay is to ignore blocks that are marked for deletion with some delay. This ensures store can still serve blocks that are meant to be deleted but do not have a replacement yet. "+
"Default is 6h, half of the default value for -compactor.deletion-delay.")
f.IntVar(&cfg.PostingOffsetsInMemSampling, "blocks-storage.bucket-store.posting-offsets-in-mem-sampling", store.DefaultPostingOffsetInMemorySampling, "Controls what is the ratio of postings offsets that the store will hold in memory.")
f.BoolVar(&cfg.IndexHeaderLazyLoadingEnabled, "blocks-storage.bucket-store.index-header-lazy-loading-enabled", false, "If enabled, store-gateway will lazy load an index-header only once required by a query.")
f.DurationVar(&cfg.IndexHeaderLazyLoadingIdleTimeout, "blocks-storage.bucket-store.index-header-lazy-loading-idle-timeout", 20*time.Minute, "If index-header lazy loading is enabled this setting is > 0, the store-gateway will offload unused index-headers after 'idle timeout' inactivity.")
}

// Validate the config.
Expand Down
Loading