Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
* [ENHANCEMENT] Compactor: concurrently run blocks cleaner for multiple tenants. Concurrency can be configured via `-compactor.cleanup-concurrency`. #3483
* [ENHANCEMENT] Compactor: shuffle tenants before running compaction. #3483
* [ENHANCEMENT] Compactor: wait for a stable ring at startup, when sharding is enabled. #3484
* [ENHANCEMENT] Store-gateway: added `-blocks-storage.bucket-store.index-header-lazy-loading-enabled` to enable index-header lazy loading (experimental). When enabled, index-headers will be mmap-ed only once required by a query and will be automatically released after `-blocks-storage.bucket-store.index-header-lazy-loading-idle-timeout` time of inactivity. #3498
* [BUGFIX] Blocks storage ingester: fixed some cases leading to a TSDB WAL corruption after a partial write to disk. #3423
* [BUGFIX] Blocks storage: Fix the race between ingestion and `/flush` call resulting in overlapping blocks. #3422
* [BUGFIX] Querier: fixed `-querier.max-query-into-future` which wasn't correctly enforced on range queries. #3452
Expand Down
1 change: 1 addition & 0 deletions docs/configuration/v1-guarantees.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,4 @@ Currently experimental features are:
- Metric relabeling in the distributor.
- Scalable query-frontend (when using query-scheduler)
- Querying store for series, labels APIs (`-querier.query-store-for-labels-enabled`)
- Blocks storage: lazy mmap of block indexes in the store-gateway (`-blocks-storage.bucket-store.index-header-lazy-loading-enabled`)
6 changes: 5 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ require (
github.com/hashicorp/go-sockaddr v1.0.2
github.com/hashicorp/memberlist v0.2.2
github.com/json-iterator/go v1.1.10
github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 // indirect
github.com/lib/pq v1.3.0
github.com/mitchellh/go-wordwrap v1.0.0
github.com/ncw/swift v1.0.50
Expand All @@ -52,7 +53,7 @@ require (
github.com/sony/gobreaker v0.4.1
github.com/spf13/afero v1.2.2
github.com/stretchr/testify v1.6.1
github.com/thanos-io/thanos v0.13.1-0.20201030101306-47f9a225cc52
github.com/thanos-io/thanos v0.13.1-0.20201112171553-05fbe15616c7
github.com/uber/jaeger-client-go v2.25.0+incompatible
github.com/weaveworks/common v0.0.0-20200914083218-61ffdd448099
go.etcd.io/bbolt v1.3.5-0.20200615073812-232d8fc87f50
Expand Down Expand Up @@ -82,6 +83,9 @@ replace github.com/gocql/gocql => github.com/grafana/gocql v0.0.0-20200605141915
// We can't upgrade to grpc 1.30.0 until go.etcd.io/etcd will support it.
replace google.golang.org/grpc => google.golang.org/grpc v1.29.1

// We can't upgrade until grpc upgrade is unblocked.
replace github.com/sercand/kuberesolver => github.com/sercand/kuberesolver v2.4.0+incompatible

// Using a 3rd-party branch for custom dialer - see https://github.com/bradfitz/gomemcache/pull/86
replace github.com/bradfitz/gomemcache => github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab

Expand Down
57 changes: 20 additions & 37 deletions go.sum

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions pkg/compactor/blocks_cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,8 @@ func (c *BlocksCleaner) cleanUserPartialBlocks(ctx context.Context, partials map
}

// We can safely delete only partial blocks with a deletion mark.
_, err := metadata.ReadDeletionMark(ctx, userBucket, userLogger, blockID.String())
if err == metadata.ErrorDeletionMarkNotFound {
err := metadata.ReadMarker(ctx, userLogger, userBucket, blockID.String(), &metadata.DeletionMark{})
if err == metadata.ErrorMarkerNotFound {
continue
}
if err != nil {
Expand Down
39 changes: 23 additions & 16 deletions pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,18 +99,19 @@ type Compactor struct {
// If empty, no users are disabled. If not empty, users in the map are disabled (not owned by this compactor).
disabledUsers map[string]struct{}

// Function that creates bucket client and TSDB compactor using the context.
// Function that creates bucket client, TSDB planner and compactor using the context.
// Useful for injecting mock objects from tests.
createBucketClientAndTsdbCompactor func(ctx context.Context) (objstore.Bucket, tsdb.Compactor, error)
createDependencies func(ctx context.Context) (objstore.Bucket, tsdb.Compactor, compact.Planner, error)

// Users scanner, used to discover users from the bucket.
usersScanner *UsersScanner

// Blocks cleaner is responsible to hard delete blocks marked for deletion.
blocksCleaner *BlocksCleaner

// Underlying compactor used to compact TSDB blocks.
// Underlying compactor and planner used to compact TSDB blocks.
tsdbCompactor tsdb.Compactor
tsdbPlanner compact.Planner

// Client used to run operations on the bucket storing blocks.
bucketClient objstore.Bucket
Expand All @@ -135,17 +136,22 @@ type Compactor struct {

// NewCompactor makes a new Compactor.
func NewCompactor(compactorCfg Config, storageCfg cortex_tsdb.BlocksStorageConfig, logger log.Logger, registerer prometheus.Registerer) (*Compactor, error) {
createBucketClientAndTsdbCompactor := func(ctx context.Context) (objstore.Bucket, tsdb.Compactor, error) {
createDependencies := func(ctx context.Context) (objstore.Bucket, tsdb.Compactor, compact.Planner, error) {
bucketClient, err := cortex_tsdb.NewBucketClient(ctx, storageCfg.Bucket, "compactor", logger, registerer)
if err != nil {
return nil, nil, errors.Wrap(err, "failed to create the bucket client")
return nil, nil, nil, errors.Wrap(err, "failed to create the bucket client")
}

compactor, err := tsdb.NewLeveledCompactor(ctx, registerer, logger, compactorCfg.BlockRanges.ToMilliseconds(), downsample.NewPool())
return bucketClient, compactor, err
if err != nil {
return nil, nil, nil, err
}

planner := compact.NewTSDBBasedPlanner(logger, compactorCfg.BlockRanges.ToMilliseconds())
return bucketClient, compactor, planner, nil
}

cortexCompactor, err := newCompactor(compactorCfg, storageCfg, logger, registerer, createBucketClientAndTsdbCompactor)
cortexCompactor, err := newCompactor(compactorCfg, storageCfg, logger, registerer, createDependencies)
if err != nil {
return nil, errors.Wrap(err, "failed to create Cortex blocks compactor")
}
Expand All @@ -158,16 +164,16 @@ func newCompactor(
storageCfg cortex_tsdb.BlocksStorageConfig,
logger log.Logger,
registerer prometheus.Registerer,
createBucketClientAndTsdbCompactor func(ctx context.Context) (objstore.Bucket, tsdb.Compactor, error),
createDependencies func(ctx context.Context) (objstore.Bucket, tsdb.Compactor, compact.Planner, error),
) (*Compactor, error) {
c := &Compactor{
compactorCfg: compactorCfg,
storageCfg: storageCfg,
parentLogger: logger,
logger: log.With(logger, "component", "compactor"),
registerer: registerer,
syncerMetrics: newSyncerMetrics(registerer),
createBucketClientAndTsdbCompactor: createBucketClientAndTsdbCompactor,
compactorCfg: compactorCfg,
storageCfg: storageCfg,
parentLogger: logger,
logger: log.With(logger, "component", "compactor"),
registerer: registerer,
syncerMetrics: newSyncerMetrics(registerer),
createDependencies: createDependencies,

compactionRunsStarted: promauto.With(registerer).NewCounter(prometheus.CounterOpts{
Name: "cortex_compactor_runs_started_total",
Expand Down Expand Up @@ -223,7 +229,7 @@ func (c *Compactor) starting(ctx context.Context) error {
var err error

// Create bucket client and compactor.
c.bucketClient, c.tsdbCompactor, err = c.createBucketClientAndTsdbCompactor(ctx)
c.bucketClient, c.tsdbCompactor, c.tsdbPlanner, err = c.createDependencies(ctx)
if err != nil {
return errors.Wrap(err, "failed to initialize compactor objects")
}
Expand Down Expand Up @@ -472,6 +478,7 @@ func (c *Compactor) compactUser(ctx context.Context, userID string) error {
ulogger,
syncer,
grouper,
c.tsdbPlanner,
c.tsdbCompactor,
path.Join(c.compactorCfg.DataDir, "compact"),
bucket,
Expand Down
53 changes: 32 additions & 21 deletions pkg/compactor/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/compact"
"github.com/thanos-io/thanos/pkg/objstore"
"gopkg.in/yaml.v2"

Expand Down Expand Up @@ -86,7 +87,7 @@ func TestCompactor_ShouldDoNothingOnNoUserBlocks(t *testing.T) {
bucketClient := &cortex_tsdb.BucketClientMock{}
bucketClient.MockIter("", []string{}, nil)

c, _, logs, registry, cleanup := prepare(t, prepareConfig(), bucketClient)
c, _, _, logs, registry, cleanup := prepare(t, prepareConfig(), bucketClient)
defer cleanup()
require.NoError(t, services.StartAndAwaitRunning(context.Background(), c))

Expand Down Expand Up @@ -229,7 +230,7 @@ func TestCompactor_ShouldRetryCompactionOnFailureWhileDiscoveringUsersFromBucket
bucketClient := &cortex_tsdb.BucketClientMock{}
bucketClient.MockIter("", nil, errors.New("failed to iterate the bucket"))

c, _, logs, registry, cleanup := prepare(t, prepareConfig(), bucketClient)
c, _, _, logs, registry, cleanup := prepare(t, prepareConfig(), bucketClient)
defer cleanup()
require.NoError(t, services.StartAndAwaitRunning(context.Background(), c))

Expand Down Expand Up @@ -385,14 +386,14 @@ func TestCompactor_ShouldIterateOverUsersAndRunCompaction(t *testing.T) {
bucketClient.MockGet("user-2/01DTW0ZCPDDNV4BV83Q2SV4QAZ/meta.json", mockBlockMetaJSON("01DTW0ZCPDDNV4BV83Q2SV4QAZ"), nil)
bucketClient.MockGet("user-2/01DTW0ZCPDDNV4BV83Q2SV4QAZ/deletion-mark.json", "", nil)

c, tsdbCompactor, logs, registry, cleanup := prepare(t, prepareConfig(), bucketClient)
c, _, tsdbPlanner, logs, registry, cleanup := prepare(t, prepareConfig(), bucketClient)
defer cleanup()

// Mock the compactor as if there's no compaction to do,
// Mock the planner as if there's no compaction to do,
// in order to simplify tests (all in all, we just want to
// test our logic and not TSDB compactor which we expect to
// be already tested).
tsdbCompactor.On("Plan", mock.Anything).Return([]string{}, nil)
tsdbPlanner.On("Plan", mock.Anything, mock.Anything).Return([]*metadata.Meta{}, nil)

require.NoError(t, services.StartAndAwaitRunning(context.Background(), c))

Expand All @@ -404,7 +405,7 @@ func TestCompactor_ShouldIterateOverUsersAndRunCompaction(t *testing.T) {
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), c))

// Ensure a plan has been executed for the blocks of each user.
tsdbCompactor.AssertNumberOfCalls(t, "Plan", 2)
tsdbPlanner.AssertNumberOfCalls(t, "Plan", 2)

assert.ElementsMatch(t, []string{
`level=info component=cleaner msg="started hard deletion of blocks marked for deletion"`,
Expand Down Expand Up @@ -496,14 +497,14 @@ func TestCompactor_ShouldNotCompactBlocksMarkedForDeletion(t *testing.T) {
bucketClient.MockDelete("user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ/deletion-mark.json", nil)
bucketClient.MockDelete("user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ", nil)

c, tsdbCompactor, logs, registry, cleanup := prepare(t, cfg, bucketClient)
c, _, tsdbPlanner, logs, registry, cleanup := prepare(t, cfg, bucketClient)
defer cleanup()

// Mock the compactor as if there's no compaction to do,
// Mock the planner as if there's no compaction to do,
// in order to simplify tests (all in all, we just want to
// test our logic and not TSDB compactor which we expect to
// be already tested).
tsdbCompactor.On("Plan", mock.Anything).Return([]string{}, nil)
tsdbPlanner.On("Plan", mock.Anything, mock.Anything).Return([]*metadata.Meta{}, nil)

require.NoError(t, services.StartAndAwaitRunning(context.Background(), c))

Expand All @@ -515,7 +516,7 @@ func TestCompactor_ShouldNotCompactBlocksMarkedForDeletion(t *testing.T) {
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), c))

// Only one user's block is compacted.
tsdbCompactor.AssertNumberOfCalls(t, "Plan", 1)
tsdbPlanner.AssertNumberOfCalls(t, "Plan", 1)

assert.ElementsMatch(t, []string{
`level=info component=cleaner msg="started hard deletion of blocks marked for deletion"`,
Expand Down Expand Up @@ -600,14 +601,14 @@ func TestCompactor_ShouldCompactAllUsersOnShardingEnabledButOnlyOneInstanceRunni
cfg.ShardingRing.InstanceAddr = "1.2.3.4"
cfg.ShardingRing.KVStore.Mock = consul.NewInMemoryClient(ring.GetCodec())

c, tsdbCompactor, logs, _, cleanup := prepare(t, cfg, bucketClient)
c, _, tsdbPlanner, logs, _, cleanup := prepare(t, cfg, bucketClient)
defer cleanup()

// Mock the compactor as if there's no compaction to do,
// Mock the planner as if there's no compaction to do,
// in order to simplify tests (all in all, we just want to
// test our logic and not TSDB compactor which we expect to
// be already tested).
tsdbCompactor.On("Plan", mock.Anything).Return([]string{}, nil)
tsdbPlanner.On("Plan", mock.Anything, mock.Anything).Return([]*metadata.Meta{}, nil)

require.NoError(t, services.StartAndAwaitRunning(context.Background(), c))

Expand All @@ -619,7 +620,7 @@ func TestCompactor_ShouldCompactAllUsersOnShardingEnabledButOnlyOneInstanceRunni
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), c))

// Ensure a plan has been executed for the blocks of each user.
tsdbCompactor.AssertNumberOfCalls(t, "Plan", 2)
tsdbPlanner.AssertNumberOfCalls(t, "Plan", 2)

assert.ElementsMatch(t, []string{
`level=info component=compactor msg="waiting until compactor is ACTIVE in the ring"`,
Expand Down Expand Up @@ -683,18 +684,18 @@ func TestCompactor_ShouldCompactOnlyUsersOwnedByTheInstanceOnShardingEnabledAndM
cfg.ShardingRing.WaitStabilityMaxDuration = 10 * time.Second
cfg.ShardingRing.KVStore.Mock = kvstore

c, tsdbCompactor, l, _, cleanup := prepare(t, cfg, bucketClient)
c, _, tsdbPlanner, l, _, cleanup := prepare(t, cfg, bucketClient)
defer services.StopAndAwaitTerminated(context.Background(), c) //nolint:errcheck
defer cleanup()

compactors = append(compactors, c)
logs = append(logs, l)

// Mock the compactor as if there's no compaction to do,
// Mock the planner as if there's no compaction to do,
// in order to simplify tests (all in all, we just want to
// test our logic and not TSDB compactor which we expect to
// be already tested).
tsdbCompactor.On("Plan", mock.Anything).Return([]string{}, nil)
tsdbPlanner.On("Plan", mock.Anything, mock.Anything).Return([]*metadata.Meta{}, nil)
}

// Start all compactors
Expand Down Expand Up @@ -844,7 +845,7 @@ func prepareConfig() Config {
return compactorCfg
}

func prepare(t *testing.T, compactorCfg Config, bucketClient objstore.Bucket) (*Compactor, *tsdbCompactorMock, *concurrency.SyncBuffer, prometheus.Gatherer, func()) {
func prepare(t *testing.T, compactorCfg Config, bucketClient objstore.Bucket) (*Compactor, *tsdbCompactorMock, *tsdbPlannerMock, *concurrency.SyncBuffer, prometheus.Gatherer, func()) {
storageCfg := cortex_tsdb.BlocksStorageConfig{}
flagext.DefaultValues(&storageCfg)

Expand All @@ -858,16 +859,17 @@ func prepare(t *testing.T, compactorCfg Config, bucketClient objstore.Bucket) (*
}

tsdbCompactor := &tsdbCompactorMock{}
tsdbPlanner := &tsdbPlannerMock{}
logs := &concurrency.SyncBuffer{}
logger := log.NewLogfmtLogger(logs)
registry := prometheus.NewRegistry()

c, err := newCompactor(compactorCfg, storageCfg, logger, registry, func(ctx context.Context) (objstore.Bucket, tsdb.Compactor, error) {
return bucketClient, tsdbCompactor, nil
c, err := newCompactor(compactorCfg, storageCfg, logger, registry, func(ctx context.Context) (objstore.Bucket, tsdb.Compactor, compact.Planner, error) {
return bucketClient, tsdbCompactor, tsdbPlanner, nil
})
require.NoError(t, err)

return c, tsdbCompactor, logs, registry, cleanup
return c, tsdbCompactor, tsdbPlanner, logs, registry, cleanup
}

type tsdbCompactorMock struct {
Expand All @@ -889,6 +891,15 @@ func (m *tsdbCompactorMock) Compact(dest string, dirs []string, open []*tsdb.Blo
return args.Get(0).(ulid.ULID), args.Error(1)
}

type tsdbPlannerMock struct {
mock.Mock
}

func (m *tsdbPlannerMock) Plan(ctx context.Context, metasByMinTime []*metadata.Meta) ([]*metadata.Meta, error) {
args := m.Called(ctx, metasByMinTime)
return args.Get(0).([]*metadata.Meta), args.Error(1)
}

func mockBlockMetaJSON(id string) string {
meta := tsdb.BlockMeta{
Version: 1,
Expand Down
7 changes: 7 additions & 0 deletions pkg/storage/tsdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,11 @@ type BucketStoreConfig struct {
MetadataCache MetadataCacheConfig `yaml:"metadata_cache"`
IgnoreDeletionMarksDelay time.Duration `yaml:"ignore_deletion_mark_delay"`

// Controls whether index-header lazy loading is enabled. This config option is hidden
// while it is marked as experimental.
IndexHeaderLazyLoadingEnabled bool `yaml:"index_header_lazy_loading_enabled" doc:"hidden"`
IndexHeaderLazyLoadingIdleTimeout time.Duration `yaml:"index_header_lazy_loading_idle_timeout" doc:"hidden"`

// Controls what is the ratio of postings offsets store will hold in memory.
// Larger value will keep less offsets, which will increase CPU cycles needed for query touching those postings.
// It's meant for setups that want low baseline memory pressure and where less traffic is expected.
Expand All @@ -288,6 +293,8 @@ func (cfg *BucketStoreConfig) RegisterFlags(f *flag.FlagSet) {
"The idea of ignore-deletion-marks-delay is to ignore blocks that are marked for deletion with some delay. This ensures store can still serve blocks that are meant to be deleted but do not have a replacement yet. "+
"Default is 6h, half of the default value for -compactor.deletion-delay.")
f.IntVar(&cfg.PostingOffsetsInMemSampling, "blocks-storage.bucket-store.posting-offsets-in-mem-sampling", store.DefaultPostingOffsetInMemorySampling, "Controls what is the ratio of postings offsets that the store will hold in memory.")
f.BoolVar(&cfg.IndexHeaderLazyLoadingEnabled, "blocks-storage.bucket-store.index-header-lazy-loading-enabled", false, "If enabled, store-gateway will lazy load an index-header only once required by a query.")
f.DurationVar(&cfg.IndexHeaderLazyLoadingIdleTimeout, "blocks-storage.bucket-store.index-header-lazy-loading-idle-timeout", 20*time.Minute, "If index-header lazy loading is enabled and this setting is > 0, the store-gateway will offload unused index-headers after 'idle timeout' inactivity.")
}

// Validate the config.
Expand Down
Loading