Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
* [ENHANCEMENT] Memberlist KV: incoming messages are now processed on per-key goroutine. This may reduce loss of "maintanance" packets in busy memberlist installations, but use more CPU. New `memberlist_client_received_broadcasts_dropped_total` counter tracks number of dropped per-key messages. #1912
* [ENHANCEMENT] Blocks Storage, Alertmanager, Ruler: add support a prefix to the bucket store (`*_storage.storage_prefix`). This enables using the same bucket for the three components. #1686 #1951
* [ENHANCEMENT] Upgrade Docker base images to `alpine:3.16.0`. #2028
* [ENHANCEMENT] Store-gateway: Add experimental configuration option for the store-gateway to attempt to pre-populate the file system cache when memory-mapping index-header files. Enabled with `-blocks-storage.bucket-store.index-header.map-populate-enabled=true`. #2019
* [BUGFIX] Fix regexp parsing panic for regexp label matchers with start/end quantifiers. #1883
* [BUGFIX] Ingester: fixed deceiving error log "failed to update cached shipped blocks after shipper initialisation", occurring for each new tenant in the ingester. #1893
* [BUGFIX] Ring: fix bug where instances may appear unhealthy in the hash ring web UI even though they are not. #1933
Expand Down
21 changes: 21 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -5131,6 +5131,27 @@
"fieldFlag": "blocks-storage.bucket-store.posting-offsets-in-mem-sampling",
"fieldType": "int",
"fieldCategory": "advanced"
},
{
"kind": "block",
"name": "index_header",
"required": false,
"desc": "",
"blockEntries": [
{
"kind": "field",
"name": "map_populate_enabled",
"required": false,
"desc": "If enabled, the store-gateway will attempt to pre-populate the file system cache when memory-mapping index-header files.",
"fieldValue": null,
"fieldDefaultValue": false,
"fieldFlag": "blocks-storage.bucket-store.index-header.map-populate-enabled",
"fieldType": "boolean",
"fieldCategory": "experimental"
}
],
"fieldValue": null,
"fieldDefaultValue": null
}
],
"fieldValue": null,
Expand Down
2 changes: 2 additions & 0 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,8 @@ Usage of ./cmd/mimir/mimir:
If enabled, store-gateway will lazy load an index-header only once required by a query. (default true)
-blocks-storage.bucket-store.index-header-lazy-loading-idle-timeout duration
If index-header lazy loading is enabled and this setting is > 0, the store-gateway will offload unused index-headers after 'idle timeout' inactivity. (default 1h0m0s)
-blocks-storage.bucket-store.index-header.map-populate-enabled
[experimental] If enabled, the store-gateway will attempt to pre-populate the file system cache when memory-mapping index-header files.
-blocks-storage.bucket-store.max-chunk-pool-bytes uint
Max size - in bytes - of a chunks pool, used to reduce memory allocations. The pool is shared across all tenants. 0 to disable the limit. (default 2147483648)
-blocks-storage.bucket-store.max-concurrent int
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3397,6 +3397,12 @@ bucket_store:
# CLI flag: -blocks-storage.bucket-store.posting-offsets-in-mem-sampling
[postings_offsets_in_mem_sampling: <int> | default = 32]

index_header:
# (experimental) If enabled, the store-gateway will attempt to pre-populate
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should move index_header_lazy_loading_enabled to index_header section at some point, if this experiment survives.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To avoid introducing a breaking change, I would rather do the opposite and use YAML inline for the new index header config struct in the bucket stores config. This would allow us to have a dedicated struct for index header config in the code, but not having to move existing stable flags.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm working on a PR that will touch this area of the code, #2048. I can open a PR to implement this change and update relevant internal (Grafana) configuration.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually nevermind, I'll leave it as is for now and wait to see if our various experiments with index header changes survive.

# the file system cache when memory-mapping index-header files.
# CLI flag: -blocks-storage.bucket-store.index-header.map-populate-enabled
[map_populate_enabled: <boolean> | default = false]

tsdb:
# Directory to store TSDBs (including WAL) in the ingesters. This directory is
# required to be persisted between restarts.
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ require (
github.com/grafana-tools/sdk v0.0.0-20211220201350-966b3088eec9
github.com/grafana/regexp v0.0.0-20220304095617-2e8d9baf4ac2
github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db
golang.org/x/sys v0.0.0-20220328115105-d36c6a25d886
gopkg.in/alecthomas/kingpin.v2 v2.2.6
)

Expand Down Expand Up @@ -207,7 +208,6 @@ require (
go.uber.org/zap v1.19.1 // indirect
golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 // indirect
golang.org/x/oauth2 v0.0.0-20220309155454-6242fa91716a // indirect
golang.org/x/sys v0.0.0-20220328115105-d36c6a25d886 // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/tools v0.1.10 // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
Expand Down
5 changes: 5 additions & 0 deletions pkg/storage/tsdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/prometheus/prometheus/tsdb/wal"

"github.com/grafana/mimir/pkg/storage/bucket"
"github.com/grafana/mimir/pkg/storegateway/indexheader"
)

const (
Expand Down Expand Up @@ -292,6 +293,9 @@ type BucketStoreConfig struct {
// On the contrary, smaller value will increase baseline memory usage, but improve latency slightly.
// 1 will keep all in memory. Default value is the same as in Prometheus which gives a good balance.
PostingOffsetsInMemSampling int `yaml:"postings_offsets_in_mem_sampling" category:"advanced"`

// Controls experimental options for index-header file reading.
IndexHeader indexheader.BinaryReaderConfig `yaml:"index_header" category:"experimental"`
}

// RegisterFlags registers the BucketStore flags
Expand All @@ -300,6 +304,7 @@ func (cfg *BucketStoreConfig) RegisterFlags(f *flag.FlagSet) {
cfg.ChunksCache.RegisterFlagsWithPrefix(f, "blocks-storage.bucket-store.chunks-cache.")
cfg.MetadataCache.RegisterFlagsWithPrefix(f, "blocks-storage.bucket-store.metadata-cache.")
cfg.BucketIndex.RegisterFlagsWithPrefix(f, "blocks-storage.bucket-store.bucket-index.")
cfg.IndexHeader.RegisterFlagsWithPrefix(f, "blocks-storage.bucket-store.index-header.")

f.StringVar(&cfg.SyncDir, "blocks-storage.bucket-store.sync-dir", "./tsdb-sync/", "Directory to store synchronized TSDB index headers. This directory is not required to be persisted between restarts, but it's highly recommended in order to improve the store-gateway startup time.")
f.DurationVar(&cfg.SyncInterval, "blocks-storage.bucket-store.sync-interval", 15*time.Minute, "How frequently to scan the bucket, or to refresh the bucket index (if enabled), in order to look for changes (new blocks shipped by ingesters and blocks deleted by retention or compaction).")
Expand Down
6 changes: 6 additions & 0 deletions pkg/storegateway/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,9 @@ type BucketStore struct {
// Every how many posting offset entry we pool in heap memory. Default in Prometheus is 32.
postingOffsetsInMemSampling int

// Additional configuration for experimental indexheader.BinaryReader behaviour.
indexHeaderCfg indexheader.BinaryReaderConfig

// Enables hints in the Series() response.
enableSeriesResponseHints bool
}
Expand Down Expand Up @@ -218,6 +221,7 @@ func NewBucketStore(
partitioner Partitioner,
blockSyncConcurrency int,
postingOffsetsInMemSampling int,
indexHeaderCfg indexheader.BinaryReaderConfig,
enableSeriesResponseHints bool, // TODO(pracucci) Thanos 0.12 and below doesn't gracefully handle new fields in SeriesResponse. Drop this flag and always enable hints once we can drop backward compatibility.
lazyIndexReaderEnabled bool,
lazyIndexReaderIdleTimeout time.Duration,
Expand All @@ -240,6 +244,7 @@ func NewBucketStore(
seriesLimiterFactory: seriesLimiterFactory,
partitioner: partitioner,
postingOffsetsInMemSampling: postingOffsetsInMemSampling,
indexHeaderCfg: indexHeaderCfg,
enableSeriesResponseHints: enableSeriesResponseHints,
seriesHashCache: seriesHashCache,
metrics: metrics,
Expand Down Expand Up @@ -403,6 +408,7 @@ func (s *BucketStore) addBlock(ctx context.Context, meta *metadata.Meta) (err er
s.dir,
meta.ULID,
s.postingOffsetsInMemSampling,
s.indexHeaderCfg,
)
if err != nil {
return errors.Wrap(err, "create index header reader")
Expand Down
2 changes: 2 additions & 0 deletions pkg/storegateway/bucket_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

mimir_tsdb "github.com/grafana/mimir/pkg/storage/tsdb"
"github.com/grafana/mimir/pkg/storegateway/indexcache"
"github.com/grafana/mimir/pkg/storegateway/indexheader"
"github.com/grafana/mimir/pkg/storegateway/testhelper"

"github.com/thanos-io/thanos/pkg/block"
Expand Down Expand Up @@ -171,6 +172,7 @@ func prepareStoreWithTestBlocksForSeries(t testing.TB, dir string, bkt objstore.
newGapBasedPartitioner(mimir_tsdb.DefaultPartitionerMaxGapSize, nil),
20,
mimir_tsdb.DefaultPostingOffsetInMemorySampling,
indexheader.BinaryReaderConfig{},
true,
true,
time.Minute,
Expand Down
1 change: 1 addition & 0 deletions pkg/storegateway/bucket_stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,7 @@ func (u *BucketStores) getOrCreateStore(userID string) (*BucketStore, error) {
u.partitioner,
u.cfg.BucketStore.BlockSyncConcurrency,
u.cfg.BucketStore.PostingOffsetsInMemSampling,
u.cfg.BucketStore.IndexHeader,
true, // Enable series hints.
u.cfg.BucketStore.IndexHeaderLazyLoadingEnabled,
u.cfg.BucketStore.IndexHeaderLazyLoadingIdleTimeout,
Expand Down
12 changes: 8 additions & 4 deletions pkg/storegateway/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1096,7 +1096,7 @@ func prepareTestBlock(tb test.TB, series int) func() *bucketBlock {
})

id := uploadTestBlock(tb, tmpDir, bkt, series)
r, err := indexheader.NewBinaryReader(context.Background(), log.NewNopLogger(), bkt, tmpDir, id, mimir_tsdb.DefaultPostingOffsetInMemorySampling)
r, err := indexheader.NewBinaryReader(context.Background(), log.NewNopLogger(), bkt, tmpDir, id, mimir_tsdb.DefaultPostingOffsetInMemorySampling, indexheader.BinaryReaderConfig{})
require.NoError(tb, err)

return func() *bucketBlock {
Expand Down Expand Up @@ -1359,6 +1359,7 @@ func benchBucketSeries(t test.TB, skipChunk bool, samplesPerSeries, totalSeries
newGapBasedPartitioner(mimir_tsdb.DefaultPartitionerMaxGapSize, nil),
1,
mimir_tsdb.DefaultPostingOffsetInMemorySampling,
indexheader.BinaryReaderConfig{},
false,
false,
0,
Expand Down Expand Up @@ -1520,7 +1521,7 @@ func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) {
chunkObjs: []string{filepath.Join(id.String(), "chunks", "000001")},
chunkPool: chunkPool,
}
b1.indexHeaderReader, err = indexheader.NewBinaryReader(context.Background(), log.NewNopLogger(), bkt, tmpDir, b1.meta.ULID, mimir_tsdb.DefaultPostingOffsetInMemorySampling)
b1.indexHeaderReader, err = indexheader.NewBinaryReader(context.Background(), log.NewNopLogger(), bkt, tmpDir, b1.meta.ULID, mimir_tsdb.DefaultPostingOffsetInMemorySampling, indexheader.BinaryReaderConfig{})
assert.NoError(t, err)
}

Expand Down Expand Up @@ -1559,7 +1560,7 @@ func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) {
chunkObjs: []string{filepath.Join(id.String(), "chunks", "000001")},
chunkPool: chunkPool,
}
b2.indexHeaderReader, err = indexheader.NewBinaryReader(context.Background(), log.NewNopLogger(), bkt, tmpDir, b2.meta.ULID, mimir_tsdb.DefaultPostingOffsetInMemorySampling)
b2.indexHeaderReader, err = indexheader.NewBinaryReader(context.Background(), log.NewNopLogger(), bkt, tmpDir, b2.meta.ULID, mimir_tsdb.DefaultPostingOffsetInMemorySampling, indexheader.BinaryReaderConfig{})
assert.NoError(t, err)
}

Expand Down Expand Up @@ -1723,6 +1724,7 @@ func TestSeries_ErrorUnmarshallingRequestHints(t *testing.T) {
newGapBasedPartitioner(mimir_tsdb.DefaultPartitionerMaxGapSize, nil),
10,
mimir_tsdb.DefaultPostingOffsetInMemorySampling,
indexheader.BinaryReaderConfig{},
true,
false,
0,
Expand Down Expand Up @@ -1813,6 +1815,7 @@ func TestSeries_BlockWithMultipleChunks(t *testing.T) {
newGapBasedPartitioner(mimir_tsdb.DefaultPartitionerMaxGapSize, nil),
10,
mimir_tsdb.DefaultPostingOffsetInMemorySampling,
indexheader.BinaryReaderConfig{},
true,
false,
0,
Expand Down Expand Up @@ -1996,6 +1999,7 @@ func setupStoreForHintsTest(t *testing.T) (test.TB, *BucketStore, []*storepb.Ser
newGapBasedPartitioner(mimir_tsdb.DefaultPartitionerMaxGapSize, nil),
10,
mimir_tsdb.DefaultPostingOffsetInMemorySampling,
indexheader.BinaryReaderConfig{},
true,
false,
0,
Expand Down Expand Up @@ -2296,7 +2300,7 @@ func prepareBucket(b *testing.B, resolutionLevel compactor.ResolutionLevel) (*bu
partitioner := newGapBasedPartitioner(mimir_tsdb.DefaultPartitionerMaxGapSize, nil)

// Create an index header reader.
indexHeaderReader, err := indexheader.NewBinaryReader(ctx, logger, bkt, tmpDir, blockMeta.ULID, mimir_tsdb.DefaultPostingOffsetInMemorySampling)
indexHeaderReader, err := indexheader.NewBinaryReader(ctx, logger, bkt, tmpDir, blockMeta.ULID, mimir_tsdb.DefaultPostingOffsetInMemorySampling, indexheader.BinaryReaderConfig{})
assert.NoError(b, err)
indexCache, err := indexcache.NewInMemoryIndexCacheWithConfig(logger, nil, indexcache.DefaultInMemoryIndexCacheConfig)
assert.NoError(b, err)
Expand Down
21 changes: 16 additions & 5 deletions pkg/storegateway/indexheader/binary_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"bufio"
"context"
"encoding/binary"
"flag"
"hash"
"hash/crc32"
"io"
Expand All @@ -32,6 +33,8 @@ import (
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/runutil"

mmap "github.com/grafana/mimir/pkg/storegateway/indexheader/fileutil"
)

const (
Expand Down Expand Up @@ -459,10 +462,18 @@ type BinaryReader struct {
postingOffsetsInMemSampling int
}

type BinaryReaderConfig struct {
MapPopulateEnabled bool `yaml:"map_populate_enabled" category:"experimental"`
}

func (cfg *BinaryReaderConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) {
f.BoolVar(&cfg.MapPopulateEnabled, prefix+"map-populate-enabled", false, "If enabled, the store-gateway will attempt to pre-populate the file system cache when memory-mapping index-header files.")
}

// NewBinaryReader loads or builds new index-header if not present on disk.
func NewBinaryReader(ctx context.Context, logger log.Logger, bkt objstore.BucketReader, dir string, id ulid.ULID, postingOffsetsInMemSampling int) (*BinaryReader, error) {
func NewBinaryReader(ctx context.Context, logger log.Logger, bkt objstore.BucketReader, dir string, id ulid.ULID, postingOffsetsInMemSampling int, cfg BinaryReaderConfig) (*BinaryReader, error) {
binfn := filepath.Join(dir, id.String(), block.IndexHeaderFilename)
br, err := newFileBinaryReader(binfn, postingOffsetsInMemSampling)
br, err := newFileBinaryReader(binfn, postingOffsetsInMemSampling, cfg)
if err == nil {
return br, nil
}
Expand All @@ -475,11 +486,11 @@ func NewBinaryReader(ctx context.Context, logger log.Logger, bkt objstore.Bucket
}

level.Debug(logger).Log("msg", "built index-header file", "path", binfn, "elapsed", time.Since(start))
return newFileBinaryReader(binfn, postingOffsetsInMemSampling)
return newFileBinaryReader(binfn, postingOffsetsInMemSampling, cfg)
}

func newFileBinaryReader(path string, postingOffsetsInMemSampling int) (bw *BinaryReader, err error) {
f, err := fileutil.OpenMmapFile(path)
func newFileBinaryReader(path string, postingOffsetsInMemSampling int, cfg BinaryReaderConfig) (bw *BinaryReader, err error) {
f, err := mmap.OpenMmapFile(path, cfg.MapPopulateEnabled)
if err != nil {
return nil, err
}
Expand Down
65 changes: 65 additions & 0 deletions pkg/storegateway/indexheader/fileutil/mmap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// SPDX-License-Identifier: AGPL-3.0-only
// Provenance-includes-location: https://github.com/prometheus/prometheus/blob/main/tsdb/fileutil/mmap.go
// Provenance-includes-license: Apache-2.0
// Provenance-includes-copyright: The Prometheus Authors.

package fileutil

import (
"os"

"github.com/pkg/errors"
)

type MmapFile struct {
f *os.File
b []byte
}

func OpenMmapFile(path string, populate bool) (*MmapFile, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as far as I understand this file is basically copied from Prometheus and then modified, right?
would it be worth it to upstream this change into Prometheus?

Copy link
Contributor

@replay replay Jun 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or if we don't want to upstream into upstream prometheus, we could still consider making the change in mimir-prometheus instead of copying the file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given the size of the files, I don't think it's worth the effort doing either right now.

If the testing goes well, then I'll upstream the changes.

return OpenMmapFileWithSize(path, 0, populate)
}

func OpenMmapFileWithSize(path string, size int, populate bool) (mf *MmapFile, retErr error) {
f, err := os.Open(path)
if err != nil {
return nil, errors.Wrap(err, "try lock file")
}
defer func() {
if retErr != nil {
f.Close()
}
}()
if size <= 0 {
info, err := f.Stat()
if err != nil {
return nil, errors.Wrap(err, "stat")
}
size = int(info.Size())
}

b, err := mmap(f, size, populate)
if err != nil {
return nil, errors.Wrapf(err, "mmap, size %d", size)
}

return &MmapFile{f: f, b: b}, nil
}

func (f *MmapFile) Close() error {
err0 := munmap(f.b)
err1 := f.f.Close()

if err0 != nil {
return err0
}
return err1
}

func (f *MmapFile) File() *os.File {
return f.f
}

func (f *MmapFile) Bytes() []byte {
return f.b
}
27 changes: 27 additions & 0 deletions pkg/storegateway/indexheader/fileutil/mmap_unix.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// SPDX-License-Identifier: AGPL-3.0-only
// Provenance-includes-location: https://github.com/prometheus/prometheus/blob/main/tsdb/fileutil/mmap_unix.go
// Provenance-includes-license: Apache-2.0
// Provenance-includes-copyright: The Prometheus Authors.

//go:build !windows && !plan9
// +build !windows,!plan9

package fileutil

import (
"os"

"golang.org/x/sys/unix"
)

func mmap(f *os.File, length int, populate bool) ([]byte, error) {
flags := unix.MAP_SHARED
if populate {
flags |= unix.MAP_POPULATE
}
return unix.Mmap(int(f.Fd()), 0, length, unix.PROT_READ, flags)
}

func munmap(b []byte) (err error) {
return unix.Munmap(b)
}
Loading