diff --git a/development/mimir-microservices-mode/docker-compose.jsonnet b/development/mimir-microservices-mode/docker-compose.jsonnet index c624d662e8f..c8a74661b30 100644 --- a/development/mimir-microservices-mode/docker-compose.jsonnet +++ b/development/mimir-microservices-mode/docker-compose.jsonnet @@ -185,6 +185,8 @@ std.manifestYamlDoc({ target: 'store-gateway', httpPort: 8010 + id, jaegerApp: 'store-gateway-%d' % id, + extraArguments: + if $._config.enable_parquet then '-store-gateway.parquet-enabled=true' else '', }) for id in std.range(1, count) }, diff --git a/pkg/storage/parquet/block/lazy_reader.go b/pkg/storage/parquet/block/lazy_reader.go new file mode 100644 index 00000000000..8e2bf8af3f0 --- /dev/null +++ b/pkg/storage/parquet/block/lazy_reader.go @@ -0,0 +1,439 @@ +// SPDX-License-Identifier: AGPL-3.0-only +// Provenance-includes-location: https://github.com/thanos-io/thanos/blob/main/pkg/block/indexheader/lazy_binary_reader.go +// Provenance-includes-license: Apache-2.0 +// Provenance-includes-copyright: The Thanos Authors. + +package block + +import ( + "context" + "fmt" + "os" + "path/filepath" + "sync" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/grafana/dskit/gate" + "github.com/oklog/ulid/v2" + "github.com/parquet-go/parquet-go" + "github.com/pkg/errors" + "github.com/prometheus-community/parquet-common/convert" + "github.com/prometheus-community/parquet-common/schema" + "github.com/prometheus-community/parquet-common/storage" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/thanos-io/objstore" + "go.uber.org/atomic" + "golang.org/x/sync/errgroup" +) + +var ( + errNotIdle = errors.New("the reader is not idle") +) + +// LazyParquetReaderMetrics holds metrics tracked by LazyReaderLocalLabelsBucketChunks. +type LazyParquetReaderMetrics struct { + loadCount prometheus.Counter + loadFailedCount prometheus.Counter + unloadCount prometheus.Counter + unloadFailedCount prometheus.Counter + loadDuration prometheus.Histogram +} + +// NewLazyParquetReaderMetrics makes new LazyParquetReaderMetrics. +func NewLazyParquetReaderMetrics(reg prometheus.Registerer) *LazyParquetReaderMetrics { + return &LazyParquetReaderMetrics{ + loadCount: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "parquet_reader_lazy_load_total", + Help: "Total number of parquet reader lazy load operations.", + }), + loadFailedCount: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "parquet_reader_lazy_load_failed_total", + Help: "Total number of failed parquet reader lazy load operations.", + }), + unloadCount: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "parquet_reader_lazy_unload_total", + Help: "Total number of parquet reader lazy unload operations.", + }), + unloadFailedCount: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "parquet_reader_lazy_unload_failed_total", + Help: "Total number of failed parquet reader lazy unload operations.", + }), + loadDuration: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ + Name: "parquet_reader_lazy_load_duration_seconds", + Help: "Duration of the parquet reader lazy loading in seconds.", + Buckets: []float64{0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1, 2, 5, 15, 30, 60, 120, 300}, + }), + } +} + +type readerRequest struct { + response chan loadedReader +} + +// loadedReader represents an attempt to load a Reader. If the attempt failed, then err is set and reader is nil. +// If the attempt succeeded, then err is nil, and inUse and reader are set. +// If the attempt succeeded, then inUse must be signalled when the reader is no longer in use. +type loadedReader struct { + reader Reader + inUse *sync.WaitGroup + + err error +} + +// unloadRequest is a request to unload a binary reader. +type unloadRequest struct { + // response will receive a single error with the result of the unload operation. + // response will not be closed. + response chan error + // idleSinceNanos is the unix nano timestamp of the last time this reader was used. + // If idleSinceNanos is 0, the check on the last usage is skipped. + idleSinceNanos int64 +} + +// LazyReaderLocalLabelsBucketChunks implements the parquet block Reader interface. +// The Reader downloads the block shard's labels file from bucket to disk +// but does not open the labels file from local disk or the chunks file from the bucket +// until the respective calls to the Reader interface's LabelsFile() or ChunksFile() methods. +type LazyReaderLocalLabelsBucketChunks struct { + ctx context.Context + + blockID ulid.ULID + shardIdx int + + // bkt to download the labels file to local disk and open the chunks file from the bucket + bkt objstore.InstrumentedBucketReader + localDir string + shardOpts []storage.ShardOption + + metrics *LazyParquetReaderMetrics + onClosed func(*LazyReaderLocalLabelsBucketChunks) + lazyLoadingGate gate.Gate + + loadedReader chan readerRequest + unloadReq chan unloadRequest + + // Keep track of the last time it was used. + usedAt *atomic.Int64 + + done chan struct{} + + logger log.Logger +} + +// NewLazyReaderLocalLabelsBucketChunks initializes a parquet block Reader +// and downloads the block shard's labels file from bucket to disk. +func NewLazyReaderLocalLabelsBucketChunks( + ctx context.Context, + blockID ulid.ULID, + bkt objstore.InstrumentedBucketReader, + localDir string, + //shardOpts []storage.ShardOption, + metrics *LazyParquetReaderMetrics, + onClosed func(*LazyReaderLocalLabelsBucketChunks), + lazyLoadingGate gate.Gate, + logger log.Logger, +) (*LazyReaderLocalLabelsBucketChunks, error) { + reader := &LazyReaderLocalLabelsBucketChunks{ + ctx: ctx, + blockID: blockID, + shardIdx: FirstShardIndex, + bkt: bkt, + localDir: localDir, + + metrics: metrics, + usedAt: atomic.NewInt64(0), + onClosed: onClosed, + + lazyLoadingGate: lazyLoadingGate, + + loadedReader: make(chan readerRequest), + unloadReq: make(chan unloadRequest), + done: make(chan struct{}), + + logger: logger, + } + + g := errgroup.Group{} + g.Go(func() error { + return reader.ensureLabelsFileToLocalDisk() + }) + + if err := g.Wait(); err != nil { + return nil, err + } + + go reader.controlLoop() + return reader, nil +} + +func (r *LazyReaderLocalLabelsBucketChunks) labelsFileName() string { + return schema.LabelsPfileNameForShard(r.blockID.String(), r.shardIdx) +} + +func (r *LazyReaderLocalLabelsBucketChunks) labelsFileLocalPath() string { + return filepath.Join(r.localDir, r.labelsFileName()) +} + +func (r *LazyReaderLocalLabelsBucketChunks) ensureLabelsFileToLocalDisk() error { + localPath := r.labelsFileLocalPath() + _, err := os.Stat(localPath) + if err == nil { + // file exists, nothing to do. + return nil + } + if !os.IsNotExist(err) { + return errors.Wrap(err, "read parquet labels file from disk") + } + + level.Debug(r.logger).Log("msg", "parquet labels file not on disk; loading", "path", localPath) + start := time.Now() + if err := r.convertLabelsFileToLocalDisk(); err != nil { + return errors.Wrap(err, "write labels file") + } + level.Debug(r.logger).Log("msg", "loaded parquet labels file to disk", "path", localPath, "elapsed", time.Since(start)) + return nil +} + +// convertLabelsFileToLocalDisk utilizes the prometheus parquet convert package +// in order to download the labels file from the bucket to local disk. +// The usage of `convert` rather than a simple bucket download/copy is more CPU-intensive, +// but enables us to choose the output schema and projection of the labels file if desired +// and use the built-in read/write buffering and validation capabilities of the convert package. +func (r *LazyReaderLocalLabelsBucketChunks) convertLabelsFileToLocalDisk() error { + labelsFileName := r.labelsFileName() + bucketFileOpener := storage.NewParquetBucketOpener(r.bkt) + + bucketLabelsFile, err := bucketFileOpener.Open(r.ctx, labelsFileName, r.shardOpts...) + if err != nil { + return errors.Wrap(err, "open bucket parquet labels file") + } + bucketLabelsFileReader := parquet.NewGenericReader[any](bucketLabelsFile) + labelsFileSchema, err := schema.FromLabelsFile(bucketLabelsFile.File) + if err != nil { + return errors.Wrap(err, "get schema from bucket parquet labels file") + } + labelsProjection, err := labelsFileSchema.LabelsProjection() + if err != nil { + return errors.Wrap(err, "get schema projection from bucket parquet labels file schema") + } + outSchemaProjections := []*schema.TSDBProjection{labelsProjection} + + pipeReaderFileWriter := convert.NewPipeReaderFileWriter(r.localDir) + shardedBucketToFileWriter := convert.NewShardedWrite( + bucketLabelsFileReader, labelsFileSchema, outSchemaProjections, pipeReaderFileWriter, &convert.DefaultConvertOpts, + ) + err = shardedBucketToFileWriter.Write(r.ctx) + if err != nil { + return errors.Wrap(err, "convert bucket parquet labels file to disk") + } + return nil +} + +func (r *LazyReaderLocalLabelsBucketChunks) LabelsFile() *storage.ParquetFile { + loaded := r.getOrLoadReader(r.ctx) + return loaded.reader.LabelsFile() +} + +func (r *LazyReaderLocalLabelsBucketChunks) ChunksFile() *storage.ParquetFile { + loaded := r.getOrLoadReader(r.ctx) + return loaded.reader.ChunksFile() +} + +func (r *LazyReaderLocalLabelsBucketChunks) TSDBSchema() (*schema.TSDBSchema, error) { + loaded := r.getOrLoadReader(r.ctx) + return loaded.reader.TSDBSchema() +} + +// getOrLoadReader ensures the underlying binary index-header reader has been successfully loaded. +// Returns the reader, wait group that should be used to signal that usage of reader is finished, and an error on failure. +// Must be called without lock. +func (r *LazyReaderLocalLabelsBucketChunks) getOrLoadReader(ctx context.Context) loadedReader { + readerReq := readerRequest{response: make(chan loadedReader)} + select { + case <-r.done: + return loadedReader{err: errors.New("lazy reader is closed; this shouldn't happen")} + case r.loadedReader <- readerReq: + select { + case loadedR := <-readerReq.response: + return loadedR + case <-ctx.Done(): + // We will get a response on the channel, and if it's a loaded reader we need to signal that we're no longer using it. + // This should be rare, so spinning up a goroutine shouldn't be too expensive. + go r.waitAndCloseReader(readerReq) + return loadedReader{err: context.Cause(ctx)} + } + case <-ctx.Done(): + return loadedReader{err: context.Cause(ctx)} + } +} + +// loadReader is called from getOrLoadReader, without any locks. +func (r *LazyReaderLocalLabelsBucketChunks) loadReader() (Reader, error) { + // lazyLoadingGate implementation: blocks load if too many are happening at once. + // It's important to get permit from the Gate when NOT holding the read-lock, otherwise we risk that multiple goroutines + // that enter `load()` will deadlock themselves. (If Start() allows one goroutine to continue, but blocks another one, + // then goroutine that continues would not be able to get Write lock.) + err := r.lazyLoadingGate.Start(r.ctx) + if err != nil { + return nil, errors.Wrapf(err, "failed to wait for turn") + } + defer r.lazyLoadingGate.Done() + + level.Debug(r.logger).Log("msg", "start lazy open local parquet labels file", "path", r.localDir) + r.metrics.loadCount.Inc() + startTime := time.Now() + + labelsLocalFilePath := r.labelsFileLocalPath() + labelsLocalFileOpener := NewParquetLocalFileOpener(r.localDir) + chunksBucketOpener := storage.NewParquetBucketOpener(r.bkt) + + reader, err := NewBasicReader( + r.ctx, + r.blockID.String(), + r.shardIdx, + labelsLocalFileOpener, + chunksBucketOpener, + r.shardOpts..., + ) + + if err != nil { + r.metrics.loadFailedCount.Inc() + return nil, errors.Wrapf(err, "lazy open local parquet labels file %s", labelsLocalFilePath) + } + + elapsed := time.Since(startTime) + level.Debug(r.logger).Log("msg", "finish lazy open local parquet labels file", "path", r.localDir, "elapsed", time.Since(startTime)) + + r.metrics.loadDuration.Observe(elapsed.Seconds()) + + return reader, nil +} + +// Close implements Reader. +func (r *LazyReaderLocalLabelsBucketChunks) Close() error { + select { + case <-r.done: + return nil // already closed + default: + } + if r.onClosed != nil { + defer r.onClosed(r) + } + + // Unload without checking if idle. + if err := r.unloadIfIdleSince(0); err != nil { + return fmt.Errorf("unload index-header: %w", err) + } + + close(r.done) + return nil +} + +func (r *LazyReaderLocalLabelsBucketChunks) waitAndCloseReader(req readerRequest) { + resp := <-req.response + if resp.reader != nil { + resp.inUse.Done() + } +} + +// unloadIfIdleSince closes underlying BinaryReader if the reader is idle since given time (as unix nano). If idleSince is 0, +// the check on the last usage is skipped. Calling this function on a already unloaded reader is a no-op. +func (r *LazyReaderLocalLabelsBucketChunks) unloadIfIdleSince(tsNano int64) error { + req := unloadRequest{ + // The channel is unbuffered because we will read the response. It should be buffered if we can give up before reading from it + response: make(chan error), + idleSinceNanos: tsNano, + } + select { + case r.unloadReq <- req: + return <-req.response + case <-r.done: + return nil // if the control loop has returned we can't do much other than return. + } +} + +func (r *LazyReaderLocalLabelsBucketChunks) controlLoop() { + var loaded loadedReader + + for { + select { + case <-r.done: + return + case readerReq := <-r.loadedReader: + if loaded.reader == nil { + // Try to load the reader if it hasn't been loaded before or if the previous loading failed. + loaded = loadedReader{} + loaded.reader, loaded.err = r.loadReader() + if loaded.reader != nil { + loaded.inUse = &sync.WaitGroup{} + } + } + if loaded.reader != nil { + loaded.inUse.Add(1) + r.usedAt.Store(time.Now().UnixNano()) + } + readerReq.response <- loaded + + case unloadPromise := <-r.unloadReq: + if loaded.reader == nil { + // Nothing to do if already unloaded. + unloadPromise.response <- nil + continue + } + + // Do not unloadIfIdleSince if not idle. + if ts := unloadPromise.idleSinceNanos; ts > 0 && r.usedAt.Load() > ts { + unloadPromise.response <- errNotIdle + continue + } + + // Wait until all users finished using current reader. + waitReadersOrPanic(loaded.inUse) + + r.metrics.unloadCount.Inc() + if err := loaded.reader.Close(); err != nil { + r.metrics.unloadFailedCount.Inc() + unloadPromise.response <- fmt.Errorf("closing binary reader: %w", err) + continue + } + + loaded = loadedReader{} + r.usedAt.Store(0) + unloadPromise.response <- nil + } + } +} + +func waitReadersOrPanic(wg *sync.WaitGroup) { + // timeout is long enough for any request to finish. + // The idea is that we don't want to wait forever, but surface a bug. + const timeout = time.Hour + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + select { + case <-done: + return + case <-time.After(timeout): + // It is illegal to leave the hanging wg.Wait() and later call wg.Add() on the same instance. + // So we panic here. + panic(fmt.Sprintf("timed out waiting for readers after %s, there is probably a bug keeping readers open, please report this", timeout)) + } +} + +// IsIdleSince returns true if the reader is idle since given time (as unix nano). +func (r *LazyReaderLocalLabelsBucketChunks) IsIdleSince(ts int64) bool { + lastUse := r.LoadedLastUse() + return lastUse != 0 && lastUse <= ts +} + +// LoadedLastUse returns 0 if the reader is not loaded. +// LoadedLastUse returns a timestamp in nanoseconds of the last time this reader was used. +func (r *LazyReaderLocalLabelsBucketChunks) LoadedLastUse() int64 { + return r.usedAt.Load() +} diff --git a/pkg/storage/parquet/block/reader.go b/pkg/storage/parquet/block/reader.go new file mode 100644 index 00000000000..dfef5785d78 --- /dev/null +++ b/pkg/storage/parquet/block/reader.go @@ -0,0 +1,158 @@ +package block + +import ( + "context" + "io" + "path/filepath" + "sync" + "time" + + "github.com/hashicorp/go-multierror" + "github.com/pkg/errors" + "github.com/prometheus-community/parquet-common/schema" + "github.com/prometheus-community/parquet-common/storage" + "github.com/thanos-io/objstore" + "go.opentelemetry.io/otel" + "golang.org/x/sync/errgroup" +) + +// SPDX-License-Identifier: AGPL-3.0-only +// Provenance-includes-location: https://github.com/thanos-io/thanos/blob/main/pkg/block/indexheader/header.go +// Provenance-includes-license: Apache-2.0 +// Provenance-includes-copyright: The Thanos Authors. + +const ( + DefaultIndexHeaderLazyLoadingEnabled = true + DefaultIndexHeaderLazyLoadingIdleTimeout = 60 * time.Minute +) + +var tracer = otel.Tracer("pkg/storage/parquet/index") + +var ( + errInvalidParquetIndexLazyLoadingConcurrency = errors.New("invalid parquet index lazy loading max concurrency; must be non-negative") +) + +// FirstShardIndex represents the default initial shard for a parquet block reader; +// TSDB blocks can be split into multiple shards when converted to our Parquet format, +// but this sharding is not in use yet; assume all blocks have a single Parquet shard for now. +const FirstShardIndex = 0 + +// Reader wraps access to a TSDB block's storage.ParquetShard interface. + +type Reader interface { + storage.ParquetShard + io.Closer +} + +type ParquetBucketOpener struct { + bkt objstore.BucketReader + prefix string +} + +func NewParquetBucketOpener(bkt objstore.BucketReader, prefix string) *ParquetBucketOpener { + return &ParquetBucketOpener{ + bkt: bkt, + } +} + +func (o *ParquetBucketOpener) Open( + ctx context.Context, name string, opts ...storage.ShardOption, +) (*storage.ParquetFile, error) { + return storage.OpenFromBucket(ctx, o.bkt, filepath.Join(o.prefix, name), opts...) +} + +type ParquetLocalFileOpener struct { + dir string +} + +func NewParquetLocalFileOpener(dir string) *ParquetLocalFileOpener { + return &ParquetLocalFileOpener{ + dir: dir, + } +} + +func (o *ParquetLocalFileOpener) Open( + ctx context.Context, name string, opts ...storage.ShardOption, +) (*storage.ParquetFile, error) { + return storage.OpenFromFile(ctx, filepath.Join(o.dir, name), opts...) +} + +// BasicReader is a simple building-block implementation of the Reader interface. +// Parquet labels and chunks files are opened immediately in the constructor; +// lazy-loading or other lifecycle management can be implemented by wrapping this type. +type BasicReader struct { + labelsFile, chunksFile *storage.ParquetFile + schema *schema.TSDBSchema + o sync.Once +} + +func NewBasicReader( + ctx context.Context, + blockID string, + shard int, + labelsFileOpener storage.ParquetOpener, + chunksFileOpener storage.ParquetOpener, + opts ...storage.ShardOption, +) (*BasicReader, error) { + labelsFileName := schema.LabelsPfileNameForShard(blockID, shard) + chunksFileName := schema.ChunksPfileNameForShard(blockID, shard) + + errGroup := errgroup.Group{} + + var labelsFile, chunksFile *storage.ParquetFile + + errGroup.Go(func() (err error) { + labelsFile, err = labelsFileOpener.Open(ctx, labelsFileName, opts...) + return err + }) + + errGroup.Go(func() (err error) { + chunksFile, err = chunksFileOpener.Open(ctx, chunksFileName, opts...) + return err + }) + + if err := errGroup.Wait(); err != nil { + return nil, err + } + + return &BasicReader{ + labelsFile: labelsFile, + chunksFile: chunksFile, + }, nil +} + +func (r *BasicReader) LabelsFile() *storage.ParquetFile { + return r.labelsFile +} + +func (r *BasicReader) ChunksFile() *storage.ParquetFile { + return r.chunksFile +} + +func (r *BasicReader) TSDBSchema() (*schema.TSDBSchema, error) { + var err error + r.o.Do(func() { + r.schema, err = schema.FromLabelsFile(r.labelsFile.File) + }) + return r.schema, err +} + +func (r *BasicReader) Close() error { + err := &multierror.Error{} + err = multierror.Append(err, r.labelsFile.Close()) + err = multierror.Append(err, r.chunksFile.Close()) + // TODO figure out if we need to do anything with the loaded schema here + return err.ErrorOrNil() +} + +func labelsFileName(blockID string, shardIdx int) string { + return schema.LabelsPfileNameForShard(blockID, shardIdx) +} + +func labelsFileLocalPath(localDir, blockID string, shardIdx int) string { + return filepath.Join(localDir, labelsFileName(blockID, shardIdx)) +} + +func chunksFileName(blockID string, shardIdx int) string { + return schema.ChunksPfileNameForShard(blockID, shardIdx) +} diff --git a/pkg/storage/parquet/block/reader_pool.go b/pkg/storage/parquet/block/reader_pool.go new file mode 100644 index 00000000000..8a209c0f8e0 --- /dev/null +++ b/pkg/storage/parquet/block/reader_pool.go @@ -0,0 +1,187 @@ +package block + +// SPDX-License-Identifier: AGPL-3.0-only +// Provenance-includes-location: https://github.com/thanos-io/thanos/blob/main/pkg/block/indexheader/reader_pool.go +// Provenance-includes-license: Apache-2.0 +// Provenance-includes-copyright: The Thanos Authors. + +import ( + "context" + "sync" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/grafana/dskit/gate" + "github.com/grafana/dskit/services" + "github.com/oklog/ulid/v2" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/thanos-io/objstore" + + "github.com/grafana/mimir/pkg/storage/indexheader" +) + +// ReaderPoolMetrics holds metrics tracked by ReaderPool. +type ReaderPoolMetrics struct { + lazyReader *LazyParquetReaderMetrics + //streamReader *StreamBinaryReaderMetrics +} + +// NewReaderPoolMetrics makes new ReaderPoolMetrics. +func NewReaderPoolMetrics(reg prometheus.Registerer) *ReaderPoolMetrics { + return &ReaderPoolMetrics{ + lazyReader: NewLazyParquetReaderMetrics(reg), + //streamReader: NewStreamBinaryReaderMetrics(reg), + } +} + +// ReaderPool is used to istantiate new index-header readers and keep track of them. +// When the lazy reader is enabled, the pool keeps track of all instantiated readers +// and automatically close them once the idle timeout is reached. A closed lazy reader +// will be automatically re-opened upon next usage. +type ReaderPool struct { + services.Service + + lazyReaderEnabled bool + lazyReaderIdleTimeout time.Duration + logger log.Logger + metrics *ReaderPoolMetrics + + // Gate used to limit the number of concurrent index-header loads. + lazyLoadingGate gate.Gate + + // Keep track of all readers managed by the pool. + lazyReadersMx sync.Mutex + lazyReaders map[*LazyReaderLocalLabelsBucketChunks]struct{} +} + +// NewReaderPool makes a new ReaderPool. If lazy-loading is enabled, NewReaderPool also starts a background task for unloading idle Readers. +func NewReaderPool( + indexHeaderConfig indexheader.Config, + lazyLoadingGate gate.Gate, + logger log.Logger, + reg prometheus.Registerer, +) *ReaderPool { + p := newReaderPool(indexHeaderConfig, lazyLoadingGate, logger, reg) + if !p.lazyReaderEnabled || p.lazyReaderIdleTimeout <= 0 { + panic("not implemented: parquet block reader pool without lazy loading") + //p.Service = services.NewIdleService(nil, nil) + } else { + p.Service = services.NewTimerService(p.lazyReaderIdleTimeout/10, nil, p.unloadIdleReaders, nil) + } + return p +} + +// newReaderPool makes a new ReaderPool. +func newReaderPool( + indexHeaderConfig indexheader.Config, + lazyLoadingGate gate.Gate, + logger log.Logger, + reg prometheus.Registerer, +) *ReaderPool { + return &ReaderPool{ + logger: logger, + metrics: NewReaderPoolMetrics(reg), + lazyReaderEnabled: indexHeaderConfig.LazyLoadingEnabled, + lazyReaderIdleTimeout: indexHeaderConfig.LazyLoadingIdleTimeout, + lazyReaders: make(map[*LazyReaderLocalLabelsBucketChunks]struct{}), + lazyLoadingGate: lazyLoadingGate, + } +} + +// GetReader creates and returns a new binary reader. If the pool has been configured +// with lazy reader enabled, this function will return a lazy reader. The returned lazy reader +// is tracked by the pool and automatically closed once the idle timeout expires. +func (p *ReaderPool) GetReader( + ctx context.Context, + blockID ulid.ULID, + bkt objstore.InstrumentedBucketReader, + localDir string, + logger log.Logger, +) (Reader, error) { + var reader Reader + var err error + + reader, err = NewLazyReaderLocalLabelsBucketChunks( + ctx, + blockID, + bkt, + localDir, + p.metrics.lazyReader, + p.onLazyReaderClosed, + p.lazyLoadingGate, + logger, + ) + + if err != nil { + return nil, err + } + + // Keep track of lazy readers only if required. + if p.lazyReaderEnabled && p.lazyReaderIdleTimeout > 0 { + p.lazyReadersMx.Lock() + p.lazyReaders[reader.(*LazyReaderLocalLabelsBucketChunks)] = struct{}{} + p.lazyReadersMx.Unlock() + } + + return reader, err +} + +func (p *ReaderPool) unloadIdleReaders(context.Context) error { + idleTimeoutAgo := time.Now().Add(-p.lazyReaderIdleTimeout).UnixNano() + + for _, r := range p.getIdleReadersSince(idleTimeoutAgo) { + if err := r.unloadIfIdleSince(idleTimeoutAgo); err != nil && !errors.Is(err, errNotIdle) { + level.Warn(p.logger).Log("msg", "failed to close idle index-header reader", "err", err) + } + } + return nil // always return nil to avoid stopping the service +} + +func (p *ReaderPool) getIdleReadersSince(ts int64) []*LazyReaderLocalLabelsBucketChunks { + p.lazyReadersMx.Lock() + defer p.lazyReadersMx.Unlock() + + var idle []*LazyReaderLocalLabelsBucketChunks + for r := range p.lazyReaders { + if r.IsIdleSince(ts) { + idle = append(idle, r) + } + } + + return idle +} + +func (p *ReaderPool) isTracking(r *LazyReaderLocalLabelsBucketChunks) bool { + p.lazyReadersMx.Lock() + defer p.lazyReadersMx.Unlock() + + _, ok := p.lazyReaders[r] + return ok +} + +func (p *ReaderPool) onLazyReaderClosed(r *LazyReaderLocalLabelsBucketChunks) { + p.lazyReadersMx.Lock() + defer p.lazyReadersMx.Unlock() + + // When this function is called, it means the reader has been closed NOT because was idle + // but because the consumer closed it. By contract, a reader closed by the consumer can't + // be used anymore, so we can automatically remove it from the pool. + delete(p.lazyReaders, r) +} + +func (p *ReaderPool) LoadedBlocks() []ulid.ULID { + p.lazyReadersMx.Lock() + defer p.lazyReadersMx.Unlock() + + blocks := make([]ulid.ULID, 0, len(p.lazyReaders)) + for r := range p.lazyReaders { + usedAt := r.usedAt.Load() + if usedAt != 0 { + blocks = append(blocks, r.blockID) + } + } + + return blocks +} diff --git a/pkg/storegateway/bucket.go b/pkg/storegateway/bucket.go index fc9f0de3d89..2f445eff352 100644 --- a/pkg/storegateway/bucket.go +++ b/pkg/storegateway/bucket.go @@ -1963,7 +1963,7 @@ func (s *bucketBlockSet) timerange() (mint, maxt int64) { return mint, maxt } -// bucketBlock represents a block that is located in a bucket. It holds intermediate +// bucketBlock represents a block that is located in a bkt. It holds intermediate // state for the block on local disk. type bucketBlock struct { userID string diff --git a/pkg/storegateway/gateway.go b/pkg/storegateway/gateway.go index ca4f556019b..04bb44f70c2 100644 --- a/pkg/storegateway/gateway.go +++ b/pkg/storegateway/gateway.go @@ -208,7 +208,15 @@ func newStoreGateway(gatewayCfg Config, storageCfg mimir_tsdb.BlocksStorageConfi if gatewayCfg.ParquetEnabled { level.Info(logger).Log("msg", "store-gateway using parquet block format") - g.stores, err = NewParquetBucketStores(logger, prometheus.WrapRegistererWith(prometheus.Labels{"component": "store-gateway"}, reg)) + g.stores, err = NewParquetBucketStores( + storageCfg, + limits, + allowedTenants, + shardingStrategy, + bucketClient, + logger, + prometheus.WrapRegistererWith(prometheus.Labels{"component": "store-gateway"}, reg), + ) if err != nil { return nil, errors.Wrap(err, "create parquet bucket stores") } diff --git a/pkg/storegateway/parquet_bucket_block.go b/pkg/storegateway/parquet_bucket_block.go index 1dcd17e39a8..f28ce49da61 100644 --- a/pkg/storegateway/parquet_bucket_block.go +++ b/pkg/storegateway/parquet_bucket_block.go @@ -6,35 +6,27 @@ package storegateway import ( + "fmt" + "math" + "sort" "sync" "github.com/grafana/dskit/multierror" "github.com/oklog/ulid/v2" "github.com/prometheus-community/parquet-common/storage" + "github.com/prometheus/prometheus/model/labels" "go.uber.org/atomic" -) - -// parquetBlockSet holds all blocks. -type parquetBlockSet struct { - mtx sync.RWMutex // nolint:unused - blockSet sync.Map // nolint:unused // Maps block's ulid.ULID to the *bucketBlock. - sortedBlocks []*parquetBlockWithMeta // nolint:unused // Blocks sorted by mint, then maxt. -} -// closeAll closes all blocks in the set and returns all encountered errors after trying all blocks. -func (s *parquetBlockSet) closeAll() error { - errs := multierror.New() - s.blockSet.Range(func(_, val any) bool { - errs.Add(val.(*parquetBlockWithMeta).Close()) - return true - }) - return errs.Err() -} + "github.com/grafana/mimir/pkg/storage/tsdb/block" +) -// parquetBlockWithMeta wraps storage.ParquetShardOpener with metadata adds metadata about the block. -type parquetBlockWithMeta struct { - storage.ParquetShardOpener - BlockID ulid.ULID +// parquetBucketBlock wraps access to the block's storage.ParquetShard interface +// with metadata, metrics and caching [etc once we fill in capabilities]. +type parquetBucketBlock struct { + meta *block.Meta + blockLabels labels.Labels + storage.ParquetShard + localDir string pendingReaders sync.WaitGroup closedMtx sync.RWMutex @@ -44,12 +36,43 @@ type parquetBlockWithMeta struct { queried atomic.Bool } -func (b *parquetBlockWithMeta) MarkQueried() { +func newParquetBucketBlock( + meta *block.Meta, + shard storage.ParquetShard, + localDir string, +) *parquetBucketBlock { + return &parquetBucketBlock{ + meta: meta, + // Inject the block ID as a label to allow to match blocks by ID. + blockLabels: labels.FromStrings(block.BlockIDLabel, meta.ULID.String()), + ParquetShard: shard, + localDir: localDir, + } +} + +// matchLabels verifies whether the block matches the given matchers. +func (b *parquetBucketBlock) matchLabels(matchers []*labels.Matcher) bool { + for _, m := range matchers { + if !m.Matches(b.blockLabels.Get(m.Name)) { + return false + } + } + return true +} + +// overlapsClosedInterval returns true if the block overlaps [mint, maxt). +func (b *parquetBucketBlock) overlapsClosedInterval(mint, maxt int64) bool { + // The block itself is a half-open interval + // [b.meta.MinTime, b.meta.MaxTime). + return b.meta.MinTime <= maxt && mint < b.meta.MaxTime +} + +func (b *parquetBucketBlock) MarkQueried() { b.queried.Store(true) } // Close waits for all pending readers to finish and then closes all underlying resources. -func (b *parquetBlockWithMeta) Close() error { +func (b *parquetBucketBlock) Close() error { b.closedMtx.Lock() b.closed = true b.closedMtx.Unlock() @@ -58,3 +81,179 @@ func (b *parquetBlockWithMeta) Close() error { return nil // TODO manage reader opening and closing through pools like indexheader does. } + +// parquetBlockSet holds all blocks. +type parquetBlockSet struct { + mtx sync.RWMutex // nolint:unused + blockSet sync.Map // nolint:unused // Maps block's ulid.ULID to the *parquetBucketBlock. + sortedBlocks []*parquetBucketBlock // nolint:unused // Blocks sorted by mint, then maxt. +} + +// add adds a block to the set. +func (s *parquetBlockSet) add(b *parquetBucketBlock) error { + s.mtx.Lock() + defer s.mtx.Unlock() + + // The LoadOrStore verifies the block with the same id never ended up in the set more than once. + _, ok := s.blockSet.LoadOrStore(b.meta.ULID, b) + if ok { + // This should not ever happen. + return fmt.Errorf("block %s already exists in the set", b.meta.ULID) + } + + s.sortedBlocks = append(s.sortedBlocks, b) + + // Always sort blocks by min time, then max time. + sort.Slice(s.sortedBlocks, func(j, k int) bool { + if s.sortedBlocks[j].meta.MinTime == s.sortedBlocks[k].meta.MinTime { + return s.sortedBlocks[j].meta.MaxTime < s.sortedBlocks[k].meta.MaxTime + } + return s.sortedBlocks[j].meta.MinTime < s.sortedBlocks[k].meta.MinTime + }) + return nil +} + +// remove removes the block identified by id from the set. It returns the removed block if it was present in the set. +func (s *parquetBlockSet) remove(id ulid.ULID) *parquetBucketBlock { + s.mtx.Lock() + defer s.mtx.Unlock() + + val, ok := s.blockSet.LoadAndDelete(id) + if !ok { + return nil + } + + for i, b := range s.sortedBlocks { + if b.meta.ULID != id { + continue + } + s.sortedBlocks = append(s.sortedBlocks[:i], s.sortedBlocks[i+1:]...) + break + } + + return val.(*parquetBucketBlock) +} + +func (s *parquetBlockSet) contains(id ulid.ULID) bool { + _, ok := s.blockSet.Load(id) + return ok +} + +func (s *parquetBlockSet) len() int { + s.mtx.RLock() + defer s.mtx.RUnlock() + return len(s.sortedBlocks) +} + +// filter iterates over a time-ordered list of non-closed blocks that cover date between mint and maxt. It supports overlapping +// blocks. It only guaranties that a block is held open during the execution of fn. +func (s *parquetBlockSet) filter(mint, maxt int64, blockMatchers []*labels.Matcher, fn func(b *parquetBucketBlock)) { + if mint > maxt { + return + } + + s.mtx.RLock() + + // Fill the given interval with the blocks within the request mint and maxt. + bs := make([]*parquetBucketBlock, 0, len(s.sortedBlocks)) + for _, b := range s.sortedBlocks { + // NOTE: s.sortedBlocks are expected to be sorted in minTime order, their intervals are half-open: [b.MinTime, b.MaxTime). + if b.meta.MinTime > maxt { + break + } + + if b.overlapsClosedInterval(mint, maxt) { + // Include the block in the list of matching ones only if there are no block-level matchers + // or they actually match. + if len(blockMatchers) == 0 || b.matchLabels(blockMatchers) { + bs = append(bs, b) + } + } + } + + s.mtx.RUnlock() + + step := func(b *parquetBucketBlock) { + b.closedMtx.RLock() + defer b.closedMtx.RUnlock() + if !b.closed { + fn(b) + } + } + + for _, b := range bs { + step(b) + } +} + +// forEach iterates over all non-closed blocks in the set. It only guaranties that a block is held open during the execution of fn. +func (s *parquetBlockSet) forEach(fn func(b *parquetBucketBlock)) { + s.blockSet.Range(func(_, val any) bool { + b := val.(*parquetBucketBlock) + + b.closedMtx.RLock() + defer b.closedMtx.RUnlock() + + if !b.closed { + fn(b) + } + return true + }) +} + +// closeAll closes all blocks in the set and returns all encountered errors after trying all blocks. +func (s *parquetBlockSet) closeAll() error { + errs := multierror.New() + s.blockSet.Range(func(_, val any) bool { + errs.Add(val.(*parquetBucketBlock).Close()) + return true + }) + return errs.Err() +} + +// openBlocksULIDs returns the ULIDs of all blocks in the set which are not closed. +func (s *parquetBlockSet) openBlocksULIDs() []ulid.ULID { + ulids := make([]ulid.ULID, 0, s.len()) + s.forEach(func(b *parquetBucketBlock) { + ulids = append(ulids, b.meta.ULID) + }) + return ulids +} + +// allBlockULIDs returns the ULIDs of all blocks in the set regardless whether they are closed or not. +func (s *parquetBlockSet) allBlockULIDs() []ulid.ULID { + s.mtx.RLock() + defer s.mtx.RUnlock() + + ulids := make([]ulid.ULID, 0, len(s.sortedBlocks)) + for _, b := range s.sortedBlocks { + ulids = append(ulids, b.meta.ULID) + } + return ulids + +} + +// timerange returns the minimum and maximum timestamp available in the set. +func (s *parquetBlockSet) timerange() (mint, maxt int64) { + s.mtx.RLock() + defer s.mtx.RUnlock() + + if len(s.sortedBlocks) == 0 { + return math.MaxInt64, math.MinInt64 + } + + mint = math.MaxInt64 + maxt = math.MinInt64 + + // NOTE: s.sortedBlocks are expected to be sorted in minTime order. + for _, b := range s.sortedBlocks { + if b.meta.MinTime < mint { + mint = b.meta.MinTime + } + if b.meta.MaxTime > maxt { + maxt = b.meta.MaxTime + } + } + + return mint, maxt +} diff --git a/pkg/storegateway/parquet_bucket_store.go b/pkg/storegateway/parquet_bucket_store.go index 05c07bd585d..f84d84a32d2 100644 --- a/pkg/storegateway/parquet_bucket_store.go +++ b/pkg/storegateway/parquet_bucket_store.go @@ -7,21 +7,29 @@ package storegateway import ( "context" + "fmt" "os" + "path" + "path/filepath" + "sync" "time" "github.com/go-kit/log" + "github.com/go-kit/log/level" "github.com/gogo/protobuf/types" "github.com/grafana/dskit/gate" "github.com/grafana/dskit/multierror" "github.com/grafana/dskit/runutil" "github.com/grafana/dskit/services" + "github.com/oklog/ulid/v2" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/model/labels" "github.com/thanos-io/objstore" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + parquetBlock "github.com/grafana/mimir/pkg/storage/parquet/block" "github.com/grafana/mimir/pkg/storage/sharding" "github.com/grafana/mimir/pkg/storage/tsdb" "github.com/grafana/mimir/pkg/storage/tsdb/block" @@ -38,11 +46,15 @@ type ParquetBucketStore struct { logger log.Logger - userID string - localDir string - bucketMetrics *BucketStoreMetrics // TODO: Create ParquetBucketStoreMetrics - bkt objstore.InstrumentedBucketReader - fetcher block.MetadataFetcher + userID string + + bkt objstore.InstrumentedBucketReader + fetcher block.MetadataFetcher + localDir string + readerPool *parquetBlock.ReaderPool + + // Metrics specific to bkt store operations + metrics *BucketStoreMetrics // TODO: Create ParquetBucketStoreMetrics // Set of blocks that have the same labels blockSet *parquetBlockSet @@ -82,6 +94,7 @@ func NewParquetBucketStore( seriesLimiterFactory SeriesLimiterFactory, metrics *BucketStoreMetrics, logger log.Logger, + reg prometheus.Registerer, ) (*ParquetBucketStore, error) { s := &ParquetBucketStore{ logger: logger, @@ -89,9 +102,9 @@ func NewParquetBucketStore( userID: userID, localDir: localDir, - bucketMetrics: metrics, - bkt: bkt, - fetcher: blockMetaFetcher, + metrics: metrics, + bkt: bkt, + fetcher: blockMetaFetcher, blockSet: &parquetBlockSet{}, blockSyncConcurrency: bucketStoreConfig.BlockSyncConcurrency, @@ -104,6 +117,13 @@ func NewParquetBucketStore( maxSeriesPerBatch: bucketStoreConfig.StreamingBatchSize, } + s.readerPool = parquetBlock.NewReaderPool( + bucketStoreConfig.IndexHeader, + s.lazyLoadingGate, + logger, + reg, + ) + if err := os.MkdirAll(localDir, 0750); err != nil { return nil, errors.Wrap(err, "create local localDir") } @@ -114,7 +134,7 @@ func NewParquetBucketStore( func (s *ParquetBucketStore) start(_ context.Context) error { // Use context.Background() so that we stop the index reader pool ourselves and do it after closing all blocks. - return services.StartAndAwaitRunning(context.Background(), nil) + return services.StartAndAwaitRunning(context.Background(), s.readerPool) } func (s *ParquetBucketStore) stop(err error) error { @@ -185,7 +205,7 @@ func (s *ParquetBucketStore) Series(req *storepb.SeriesRequest, srv storegateway resHints = &hintspb.SeriesResponseHints{} ) for _, shard := range shards { - resHints.AddQueriedBlock(shard.BlockID) + resHints.AddQueriedBlock(shard.meta.ULID) shard.MarkQueried() } if err := s.sendHints(srv, resHints); err != nil { @@ -197,8 +217,8 @@ func (s *ParquetBucketStore) Series(req *storepb.SeriesRequest, srv storegateway var ( seriesSet storepb.SeriesSet seriesLoadStart = time.Now() - chunksLimiter = s.chunksLimiterFactory(s.bucketMetrics.queriesDropped.WithLabelValues("chunks")) - seriesLimiter = s.seriesLimiterFactory(s.bucketMetrics.queriesDropped.WithLabelValues("series")) + chunksLimiter = s.chunksLimiterFactory(s.metrics.queriesDropped.WithLabelValues("chunks")) + seriesLimiter = s.seriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series")) ) // Placeholder: Create series set for streaming labels from parquet shards @@ -225,8 +245,8 @@ func (s *ParquetBucketStore) Series(req *storepb.SeriesRequest, srv storegateway // We create the limiter twice in the case of streaming so that we don't double count the series // and hit the limit prematurely. - chunksLimiter := s.chunksLimiterFactory(s.bucketMetrics.queriesDropped.WithLabelValues("chunks")) - seriesLimiter := s.seriesLimiterFactory(s.bucketMetrics.queriesDropped.WithLabelValues("series")) + chunksLimiter := s.chunksLimiterFactory(s.metrics.queriesDropped.WithLabelValues("chunks")) + seriesLimiter := s.seriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series")) start := time.Now() if req.StreamingChunksBatchSize > 0 { @@ -275,7 +295,7 @@ func (s *ParquetBucketStore) LabelValues(ctx context.Context, req *storepb.Label } // Placeholder methods for parquet-specific functionality -func (s *ParquetBucketStore) openParquetShardsForReading(ctx context.Context, skipChunks bool, minTime, maxTime int64, reqBlockMatchers []*labels.Matcher, stats *safeQueryStats) []*parquetBlockWithMeta { +func (s *ParquetBucketStore) openParquetShardsForReading(ctx context.Context, skipChunks bool, minTime, maxTime int64, reqBlockMatchers []*labels.Matcher, stats *safeQueryStats) []*parquetBucketBlock { // TODO: Implement parquet shard discovery and opening logic // This should: // 1. Discover parquet shards that intersect with the time range @@ -296,7 +316,7 @@ func (s *ParquetBucketStore) sendHints(srv storegatewaypb.StoreGateway_SeriesSer panic("TODO: implement sendHints") } -func (s *ParquetBucketStore) createParquetSeriesSetForLabels(ctx context.Context, req *storepb.SeriesRequest, shards []*parquetBlockWithMeta, shardSelector *sharding.ShardSelector, matchers []*labels.Matcher, chunksLimiter ChunksLimiter, seriesLimiter SeriesLimiter, stats *safeQueryStats) (storepb.SeriesSet, error) { +func (s *ParquetBucketStore) createParquetSeriesSetForLabels(ctx context.Context, req *storepb.SeriesRequest, shards []*parquetBucketBlock, shardSelector *sharding.ShardSelector, matchers []*labels.Matcher, chunksLimiter ChunksLimiter, seriesLimiter SeriesLimiter, stats *safeQueryStats) (storepb.SeriesSet, error) { // TODO: Implement parquet series set creation for labels phase // This should: // 1. "Stream read" .labels.parquet files from shards using shard.LabelsFile() @@ -317,7 +337,7 @@ func (s *ParquetBucketStore) sendStreamingChunks(req *storepb.SeriesRequest, srv panic("TODO: implement sendStreamingChunks") } -func (s *ParquetBucketStore) createParquetSeriesChunksSetIterator(ctx context.Context, req *storepb.SeriesRequest, shards []*parquetBlockWithMeta, shardSelector *sharding.ShardSelector, matchers []*labels.Matcher, chunksLimiter ChunksLimiter, seriesLimiter SeriesLimiter, stats *safeQueryStats) iterator[seriesChunksSet] { +func (s *ParquetBucketStore) createParquetSeriesChunksSetIterator(ctx context.Context, req *storepb.SeriesRequest, shards []*parquetBucketBlock, shardSelector *sharding.ShardSelector, matchers []*labels.Matcher, chunksLimiter ChunksLimiter, seriesLimiter SeriesLimiter, stats *safeQueryStats) iterator[seriesChunksSet] { // TODO: Implement parquet series chunks iterator creation // This should: // 1. Stream read .chunks.parquet files from shards using shard.ChunksFile() @@ -331,7 +351,7 @@ func (s *ParquetBucketStore) sendSeriesChunks(req *storepb.SeriesRequest, srv st panic("TODO: implement sendSeriesChunks") } -func (s *ParquetBucketStore) createParquetSeriesSetWithChunks(ctx context.Context, req *storepb.SeriesRequest, shards []*parquetBlockWithMeta, shardSelector *sharding.ShardSelector, matchers []*labels.Matcher, chunksLimiter ChunksLimiter, seriesLimiter SeriesLimiter, stats *safeQueryStats) (storepb.SeriesSet, error) { +func (s *ParquetBucketStore) createParquetSeriesSetWithChunks(ctx context.Context, req *storepb.SeriesRequest, shards []*parquetBucketBlock, shardSelector *sharding.ShardSelector, matchers []*labels.Matcher, chunksLimiter ChunksLimiter, seriesLimiter SeriesLimiter, stats *safeQueryStats) (storepb.SeriesSet, error) { // TODO: Implement parquet series set creation for non-streaming request // I think this should create a series that includes the labels in one go and its typically called when skipchunks is true panic("TODO: implement createParquetSeriesSetWithChunks") @@ -352,6 +372,254 @@ func (s *ParquetBucketStore) sendStats(srv storegatewaypb.StoreGateway_SeriesSer panic("TODO: implement sendStats") } +// Stats returns statistics about the BucketStore instance. +func (s *ParquetBucketStore) Stats() BucketStoreStats { + return BucketStoreStats{ + BlocksLoadedTotal: s.blockSet.len(), + } +} + +// InitialSync perform blocking sync with extra step at the end to delete locally saved blocks that are no longer +// present in the bucket. The mismatch of these can only happen between restarts, so we can do that only once per startup. +func (s *ParquetBucketStore) InitialSync(ctx context.Context) error { + // Read the snapshot before running the sync. After we run a sync we'll start persisting the snapshots again, + // so we need to read the pre-shutdown snapshot before the sync. + + // TODO implement aspects that rely on the indexheader for parquet blocks + + //previouslyLoadedBlocks := s.tryRestoreLoadedBlocksSet() + + if err := s.syncBlocks(ctx); err != nil { + return errors.Wrap(err, "sync block") + } + //if s.indexHeaderCfg.EagerLoadingStartupEnabled { + // s.loadBlocks(ctx, previouslyLoadedBlocks) + //} + + err := s.cleanUpUnownedBlocks() + if err != nil { + return err + } + + return nil +} + +func (s *ParquetBucketStore) SyncBlocks(ctx context.Context) error { + return s.syncBlocks(ctx) +} + +func (s *ParquetBucketStore) syncBlocks(ctx context.Context) error { + metas, _, metaFetchErr := s.fetcher.Fetch(ctx) + // For partial view allow adding new blocks at least. + if metaFetchErr != nil && metas == nil { + return metaFetchErr + } + + var wg sync.WaitGroup + blockc := make(chan *block.Meta) + + for i := 0; i < s.blockSyncConcurrency; i++ { + wg.Add(1) + go func() { + for meta := range blockc { + if err := s.addBlock(ctx, meta); err != nil { + continue + } + } + wg.Done() + }() + } + + for id, meta := range metas { + if s.blockSet.contains(id) { + continue + } + select { + case <-ctx.Done(): + case blockc <- meta: + } + } + + close(blockc) + wg.Wait() + + if metaFetchErr != nil { + return metaFetchErr + } + + blockIDs := s.blockSet.openBlocksULIDs() + for _, id := range blockIDs { + if _, ok := metas[id]; ok { + continue + } + if err := s.removeBlock(id); err != nil { + level.Warn(s.logger).Log("msg", "drop of outdated block failed", "block", id, "err", err) + } + level.Info(s.logger).Log("msg", "dropped outdated block", "block", id) + } + + // Start snapshotter in the end of the sync, but do that only once per BucketStore's lifetime. + // We do that here, so the snapshotter watched after blocks from both initial sync and those discovered later. + // If it's already started this will return an error. We ignore that because syncBlocks can run multiple times + // We pass context.Background() because we want to stop it ourselves as opposed to stopping it as soon as the runtime context is cancelled.. + //_ = s.snapshotter.StartAsync(context.Background()) + + return nil +} + +func (s *ParquetBucketStore) addBlock(ctx context.Context, meta *block.Meta) (err error) { + blockLocalDir := filepath.Join(s.localDir, meta.ULID.String()) + start := time.Now() + + level.Debug(s.logger).Log("msg", "loading new block", "id", meta.ULID) + defer func() { + if err != nil { + s.metrics.blockLoadFailures.Inc() + if err2 := os.RemoveAll(blockLocalDir); err2 != nil { + level.Warn(s.logger).Log("msg", "failed to remove block we cannot load", "err", err2) + } + level.Error(s.logger).Log("msg", "loading block failed", "elapsed", time.Since(start), "id", meta.ULID, "err", err) + } else { + level.Info(s.logger).Log("msg", "loaded new block", "elapsed", time.Since(start), "id", meta.ULID) + } + }() + s.metrics.blockLoads.Inc() + + // TODO get shard reader from pool + blockReader, err := s.readerPool.GetReader( + ctx, + meta.ULID, + s.bkt, + blockLocalDir, + s.logger, + ) + + if err != nil { + return errors.Wrap(err, "create parquet block reader") + } + + defer func() { + if err != nil { + runutil.CloseWithErrCapture(&err, blockReader, "parquet block reader") + } + }() + + b := newParquetBucketBlock( + meta, + nil, + blockLocalDir, + ) + if err != nil { + return errors.Wrap(err, "new parquet bucket block") + } + defer func() { + if err != nil { + runutil.CloseWithErrCapture(&err, b, "index-header") + } + }() + + if err = s.blockSet.add(b); err != nil { + return errors.Wrap(err, "add block to set") + } + + return nil +} +func (s *ParquetBucketStore) tryRestoreLoadedBlocksSet() map[ulid.ULID]struct{} { + // TODO implement for parquet blocks + //previouslyLoadedBlocks, err := indexheader.RestoreLoadedBlocks(s.localDir) + //if err != nil { + // level.Warn(s.logger).Log( + // "msg", "loading the list of index-headers from snapshot file failed; not eagerly loading index-headers for tenant", + // "dir", s.localDir, + // "err", err, + // ) + // // Don't fail initialization. If eager loading doesn't happen, then we will load index-headers lazily. + // // Lazy loading which is slower, but not worth failing startup for. + //} + //return previouslyLoadedBlocks + return nil +} + +func (s *ParquetBucketStore) removeBlock(id ulid.ULID) (returnErr error) { + defer func() { + if returnErr != nil { + s.metrics.blockDropFailures.Inc() + } + }() + + b := s.blockSet.remove(id) + if b == nil { + return nil + } + + // The block has already been removed from BucketStore, so we track it as removed + // even if releasing its resources could fail below. + s.metrics.blockDrops.Inc() + + if err := b.Close(); err != nil { + return errors.Wrap(err, "close block") + } + if err := os.RemoveAll(b.localDir); err != nil { + return errors.Wrap(err, "delete block") + } + return nil +} + +// RemoveBlocksAndClose remove all blocks from local disk and releases all resources associated with the BucketStore. +func (s *ParquetBucketStore) RemoveBlocksAndClose() error { + errs := multierror.New() + if err := services.StopAndAwaitTerminated(context.Background(), s); err != nil { + errs.Add(fmt.Errorf("stopping subservices: %w", err)) + } + // Remove the blocks even if the service didn't gracefully stop. + // We want to free up disk resources given these blocks will likely not be queried again. + if err := s.removeAllBlocks(); err != nil { + errs.Add(fmt.Errorf("remove all blocks: %w", err)) + } + + return errs.Err() +} + +func (s *ParquetBucketStore) removeAllBlocks() error { + blockIDs := s.blockSet.allBlockULIDs() + + errs := multierror.New() + for _, id := range blockIDs { + if err := s.removeBlock(id); err != nil { + errs.Add(errors.Wrap(err, fmt.Sprintf("block: %s", id.String()))) + } + } + + return errs.Err() +} + func (s *ParquetBucketStore) closeAllBlocks() error { return s.blockSet.closeAll() } + +func (s *ParquetBucketStore) cleanUpUnownedBlocks() error { + fis, err := os.ReadDir(s.localDir) + if err != nil { + return errors.Wrap(err, "read dir") + } + names := make([]string, 0, len(fis)) + for _, fi := range fis { + names = append(names, fi.Name()) + } + for _, n := range names { + id, ok := block.IsBlockDir(n) + if !ok { + continue + } + if s.blockSet.contains(id) { + continue + } + + // No such block loaded, remove the local dir. + if err := os.RemoveAll(path.Join(s.localDir, id.String())); err != nil { + level.Warn(s.logger).Log("msg", "failed to remove block which is not needed", "err", err) + } + } + + return nil +} diff --git a/pkg/storegateway/parquet_bucket_stores.go b/pkg/storegateway/parquet_bucket_stores.go index 89f3c0ef476..2538586a1c2 100644 --- a/pkg/storegateway/parquet_bucket_stores.go +++ b/pkg/storegateway/parquet_bucket_stores.go @@ -8,19 +8,30 @@ package storegateway import ( "context" "fmt" + "os" + "path/filepath" "sync" + "time" "github.com/go-kit/log" + "github.com/go-kit/log/level" "github.com/grafana/dskit/backoff" "github.com/grafana/dskit/gate" + "github.com/grafana/dskit/multierror" "github.com/grafana/dskit/services" + "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" "github.com/thanos-io/objstore" + "github.com/grafana/mimir/pkg/storage/bucket" "github.com/grafana/mimir/pkg/storage/tsdb" + "github.com/grafana/mimir/pkg/storage/tsdb/block" "github.com/grafana/mimir/pkg/storegateway/storegatewaypb" "github.com/grafana/mimir/pkg/storegateway/storepb" "github.com/grafana/mimir/pkg/util" + util_log "github.com/grafana/mimir/pkg/util/log" "github.com/grafana/mimir/pkg/util/spanlogger" "github.com/grafana/mimir/pkg/util/validation" ) @@ -28,19 +39,19 @@ import ( type ParquetBucketStores struct { services.Service - logger log.Logger - reg prometheus.Registerer + cfg tsdb.BlocksStorageConfig + limits *validation.Overrides // nolint:unused + allowedTenants *util.AllowList + shardingStrategy ShardingStrategy // nolint:unused + syncBackoffConfig backoff.Config // nolint:unused - limits *validation.Overrides // nolint:unused - // Tenants that are specifically enabled or disabled via configuration - allowedTenants *util.AllowList - - bucket objstore.Bucket + bkt objstore.Bucket // Metrics specific to bucket store operations bucketStoreMetrics *BucketStoreMetrics // nolint:unused // TODO: Create ParquetBucketStoreMetrics metaFetcherMetrics *MetadataFetcherMetrics // nolint:unused - shardingStrategy ShardingStrategy // nolint:unused - syncBackoffConfig backoff.Config // nolint:unused + + // Gate used to limit query concurrency across all tenants. + queryGate gate.Gate // Gate used to limit concurrency on loading index-headers across all tenants. lazyLoadingGate gate.Gate // nolint:unused @@ -55,19 +66,92 @@ type ParquetBucketStores struct { tenantsDiscovered prometheus.Gauge // nolint:unused tenantsSynced prometheus.Gauge // nolint:unused blocksLoaded *prometheus.Desc // nolint:unused + + logger log.Logger + reg prometheus.Registerer } // NewParquetBucketStores initializes a Parquet implementation of the Stores interface. func NewParquetBucketStores( + cfg tsdb.BlocksStorageConfig, + limits *validation.Overrides, + allowedTenants *util.AllowList, + shardingStrategy ShardingStrategy, + bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, ) (*ParquetBucketStores, error) { + gateReg := prometheus.WrapRegistererWithPrefix("cortex_bucket_stores_", reg) + + // The number of concurrent queries against the tenants BucketStores are limited. + queryGateReg := prometheus.WrapRegistererWith(prometheus.Labels{"gate": "query"}, gateReg) + queryGate := gate.NewBlocking(cfg.BucketStore.MaxConcurrent) + queryGate = gate.NewInstrumented(queryGateReg, cfg.BucketStore.MaxConcurrent, queryGate) + queryGate = timeoutGate{delegate: queryGate, timeout: cfg.BucketStore.MaxConcurrentQueueTimeout} + + // The number of concurrent index header loads from storegateway are limited. + lazyLoadingGateReg := prometheus.WrapRegistererWith(prometheus.Labels{"gate": "index_header"}, gateReg) + lazyLoadingGate := gate.NewNoop() + lazyLoadingMax := cfg.BucketStore.IndexHeader.LazyLoadingConcurrency + if lazyLoadingMax != 0 { + lazyLoadingGate = gate.NewBlocking(cfg.BucketStore.IndexHeader.LazyLoadingConcurrency) + lazyLoadingGate = gate.NewInstrumented(lazyLoadingGateReg, cfg.BucketStore.IndexHeader.LazyLoadingConcurrency, lazyLoadingGate) + lazyLoadingGate = timeoutGate{delegate: lazyLoadingGate, timeout: cfg.BucketStore.IndexHeader.LazyLoadingConcurrencyQueueTimeout} + } + stores := &ParquetBucketStores{ + cfg: cfg, + limits: limits, + allowedTenants: allowedTenants, + shardingStrategy: shardingStrategy, + syncBackoffConfig: backoff.Config{ + MinBackoff: 1 * time.Second, + MaxBackoff: 10 * time.Second, + MaxRetries: 3, + }, + + bkt: bkt, + stores: map[string]*ParquetBucketStore{}, + + bucketStoreMetrics: NewBucketStoreMetrics(reg), + metaFetcherMetrics: NewMetadataFetcherMetrics(logger), + queryGate: queryGate, + lazyLoadingGate: lazyLoadingGate, + logger: logger, reg: reg, } - stores.Service = services.NewIdleService(nil, nil) + + // Register metrics. + stores.syncTimes = promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ + Name: "cortex_bucket_stores_blocks_sync_seconds", + Help: "The total time it takes to perform a sync stores", + Buckets: []float64{0.1, 1, 10, 30, 60, 120, 300, 600, 900}, + }) + stores.syncLastSuccess = promauto.With(reg).NewGauge(prometheus.GaugeOpts{ + Name: "cortex_bucket_stores_blocks_last_successful_sync_timestamp_seconds", + Help: "Unix timestamp of the last successful blocks sync.", + }) + stores.tenantsDiscovered = promauto.With(reg).NewGauge(prometheus.GaugeOpts{ + Name: "cortex_bucket_stores_tenants_discovered", + Help: "Number of tenants discovered in the bucket.", + }) + stores.tenantsSynced = promauto.With(reg).NewGauge(prometheus.GaugeOpts{ + Name: "cortex_bucket_stores_tenants_synced", + Help: "Number of tenants synced.", + }) + stores.blocksLoaded = prometheus.NewDesc( + "cortex_bucket_store_blocks_loaded", + "Number of currently loaded blocks.", + nil, nil, + ) + + if reg != nil { + reg.MustRegister(stores.metaFetcherMetrics) + reg.MustRegister(stores) + } + stores.Service = services.NewIdleService(stores.initialSync, stores.stopBucketStores) return stores, nil } @@ -129,15 +213,158 @@ func (ss *ParquetBucketStores) LabelValues(ctx context.Context, req *storepb.Lab return store.LabelValues(ctx, req) } +func (ss *ParquetBucketStores) stopBucketStores(error) error { + ss.storesMu.Lock() + defer ss.storesMu.Unlock() + errs := multierror.New() + for userID, bs := range ss.stores { + err := services.StopAndAwaitTerminated(context.Background(), bs) + if err != nil { + errs.Add(fmt.Errorf("closing bucket store for user %s: %w", userID, err)) + } + } + return errs.Err() +} + +// initialSync does an initial synchronization of blocks for all users. +func (ss *ParquetBucketStores) initialSync(ctx context.Context) error { + level.Info(ss.logger).Log("msg", "synchronizing TSDB blocks for all users") + + if err := ss.syncUsersBlocksWithRetries(ctx, func(ctx context.Context, store *ParquetBucketStore) error { + return store.InitialSync(ctx) + }); err != nil { + level.Warn(ss.logger).Log("msg", "failed to synchronize TSDB blocks", "err", err) + return fmt.Errorf("initial synchronisation with bucket: %w", err) + } + + level.Info(ss.logger).Log("msg", "successfully synchronized TSDB blocks for all users") + return nil +} + +// SyncBlocks synchronizes the stores state with the Bucket store for every user. func (ss *ParquetBucketStores) SyncBlocks(ctx context.Context) error { - // TODO implement me - panic("implement me") + return ss.syncUsersBlocksWithRetries(ctx, func(ctx context.Context, store *ParquetBucketStore) error { + return store.SyncBlocks(ctx) + }) +} + +func (ss *ParquetBucketStores) syncUsersBlocksWithRetries(ctx context.Context, f func(context.Context, *ParquetBucketStore) error) error { + retries := backoff.New(ctx, ss.syncBackoffConfig) + + var lastErr error + for retries.Ongoing() { + userIDs, err := ss.ownedUsers(ctx) + if err != nil { + retries.Wait() + continue + } + lastErr = ss.syncUsersBlocks(ctx, userIDs, f) + if lastErr == nil { + return nil + } + + retries.Wait() + } + + if lastErr == nil { + return retries.Err() + } + + return lastErr +} + +func (ss *ParquetBucketStores) ownedUsers(ctx context.Context) ([]string, error) { + userIDs, err := ss.scanUsers(ctx) + if err != nil { + return nil, err + } + ss.tenantsDiscovered.Set(float64(len(userIDs))) + + ownedUserIDs, err := ss.shardingStrategy.FilterUsers(ctx, userIDs) + if err != nil { + return nil, errors.Wrap(err, "unable to check tenants owned by this store-gateway instance") + } + + return ownedUserIDs, nil +} + +func (ss *ParquetBucketStores) syncUsersBlocks(ctx context.Context, includeUserIDs []string, f func(context.Context, *ParquetBucketStore) error) (returnErr error) { + defer func(start time.Time) { + ss.syncTimes.Observe(time.Since(start).Seconds()) + if returnErr == nil { + ss.syncLastSuccess.SetToCurrentTime() + } + }(time.Now()) + + type job struct { + userID string + store *ParquetBucketStore + } + + wg := &sync.WaitGroup{} + jobs := make(chan job) + errs := tsdb_errors.NewMulti() + errsMx := sync.Mutex{} + + ss.tenantsSynced.Set(float64(len(includeUserIDs))) + + // Create a pool of workers which will synchronize blocks. The pool size + // is limited in order to avoid to concurrently sync a lot of tenants in + // a large cluster. + for i := 0; i < ss.cfg.BucketStore.TenantSyncConcurrency; i++ { + wg.Add(1) + go func() { + defer wg.Done() + + for job := range jobs { + if err := f(ctx, job.store); err != nil { + errsMx.Lock() + errs.Add(errors.Wrapf(err, "failed to synchronize TSDB blocks for user %s", job.userID)) + errsMx.Unlock() + } + } + }() + } + + // Lazily create a bucket store for each new user found + // and submit a sync job for each user. + for _, userID := range includeUserIDs { + bs, err := ss.getOrCreateStore(ctx, userID) + if err != nil { + errsMx.Lock() + errs.Add(err) + errsMx.Unlock() + + continue + } + + select { + case jobs <- job{userID: userID, store: bs}: + // Nothing to do. Will loop to push more jobs. + case <-ctx.Done(): + // Wait until all workers have done, so the goroutines leak detector doesn't + // report any issue. This is expected to be quick, considering the done ctx + // is used by the worker callback function too. + close(jobs) + wg.Wait() + + return ctx.Err() + } + } + + // Wait until all workers completed. + close(jobs) + wg.Wait() + + ss.closeBucketStoreAndDeleteLocalFilesForExcludedTenants(includeUserIDs) + + return errs.Err() } // scanUsers in the bucket and return the list of found users, respecting any specifically // enabled or disabled users. func (ss *ParquetBucketStores) scanUsers(ctx context.Context) ([]string, error) { - users, err := tsdb.ListUsers(ctx, ss.bucket) + users, err := tsdb.ListUsers(ctx, ss.bkt) if err != nil { return nil, err } @@ -157,3 +384,196 @@ func (ss *ParquetBucketStores) getStore(userID string) *ParquetBucketStore { defer ss.storesMu.RUnlock() return ss.stores[userID] } + +func (ss *ParquetBucketStores) getOrCreateStore(ctx context.Context, userID string) (*ParquetBucketStore, error) { + // Check if the store already exists. + bs := ss.getStore(userID) + if bs != nil { + return bs, nil + } + + ss.storesMu.Lock() + defer ss.storesMu.Unlock() + + // Check again for the store in the event it was created in-between locks. + bs = ss.stores[userID] + if bs != nil { + return bs, nil + } + + userLogger := util_log.WithUserID(userID, ss.logger) + + level.Info(userLogger).Log("msg", "creating user bucket store") + + userBkt := bucket.NewUserBucketClient(userID, ss.bkt, ss.limits) + fetcherReg := prometheus.NewRegistry() + + // The sharding strategy filter MUST be before the ones we create here (order matters). + filters := []block.MetadataFilter{ + NewShardingMetadataFilterAdapter(userID, ss.shardingStrategy), + newMinTimeMetaFilter(ss.cfg.BucketStore.IgnoreBlocksWithin), + // Use our own custom implementation. + NewIgnoreDeletionMarkFilter(userLogger, userBkt, ss.cfg.BucketStore.IgnoreDeletionMarksInStoreGatewayDelay, ss.cfg.BucketStore.MetaSyncConcurrency), + // The duplicate filter has been intentionally omitted because it could cause troubles with + // the consistency check done on the querier. The duplicate filter removes redundant blocks + // but if the store-gateway removes redundant blocks before the querier discovers them, the + // consistency check on the querier will fail. + } + fetcher := NewBucketIndexMetadataFetcher( + userID, + ss.bkt, + ss.limits, + ss.logger, + fetcherReg, + filters, + ) + //bucketStoreOpts := []BucketStoreOption{ + // WithLogger(userLogger), + // //WithIndexCache(ss.indexCache), + // WithQueryGate(ss.queryGate), + // WithLazyLoadingGate(ss.lazyLoadingGate), + //} + + //bs, err := NewBucketStore( + // userID, + // userBkt, + // fetcher, + // ss.syncDirForUser(userID), + // ss.cfg.BucketStore, + // worstCaseFetchedDataStrategy{postingListActualSizeFactor: ss.cfg.BucketStore.SeriesFetchPreference}, + // NewChunksLimiterFactory(func() uint64 { + // return uint64(ss.limits.MaxChunksPerQuery(userID)) + // }), + // NewSeriesLimiterFactory(func() uint64 { + // return uint64(ss.limits.MaxFetchedSeriesPerQuery(userID)) + // }), + // ss.partitioners, + // ss.seriesHashCache, + // ss.bucketStoreMetrics, + // bucketStoreOpts..., + //) + bs, err := NewParquetBucketStore( + userID, + ss.syncDirForUser(userID), + userBkt, + ss.cfg.BucketStore, + fetcher, + ss.queryGate, + ss.lazyLoadingGate, + NewChunksLimiterFactory(func() uint64 { + return uint64(ss.limits.MaxChunksPerQuery(userID)) + }), + NewSeriesLimiterFactory(func() uint64 { + return uint64(ss.limits.MaxFetchedSeriesPerQuery(userID)) + }), + ss.bucketStoreMetrics, + userLogger, + ss.reg, + ) + + if err != nil { + return nil, err + } + if err = services.StartAndAwaitRunning(ctx, bs); err != nil { + return nil, fmt.Errorf("starting bucket store for tenant %s: %w", userID, err) + } + + ss.stores[userID] = bs + ss.metaFetcherMetrics.AddUserRegistry(userID, fetcherReg) + + return bs, nil +} + +// closeBucketStoreAndDeleteLocalFilesForExcludedTenants closes bucket store and removes local "sync" directories +// for tenants that are not included in the current shard. +func (ss *ParquetBucketStores) closeBucketStoreAndDeleteLocalFilesForExcludedTenants(includedUserIDs []string) { + files, err := os.ReadDir(ss.cfg.BucketStore.SyncDir) + if err != nil { + return + } + + includedUserIDsMap := util.StringsMap(includedUserIDs) + for _, f := range files { + if !f.IsDir() { + continue + } + + userID := f.Name() + if includedUserIDsMap[userID] { + // Preserve directory for users owned by this shard. + continue + } + + err := ss.closeBucketStore(userID) + switch { + case errors.Is(err, errBucketStoreNotFound): + // This is OK, nothing was closed. + case err == nil: + level.Info(ss.logger).Log("msg", "closed bucket store for user", "user", userID) + default: + level.Warn(ss.logger).Log("msg", "failed to close bucket store for user", "user", userID, "err", err) + } + + userSyncDir := ss.syncDirForUser(userID) + err = os.RemoveAll(userSyncDir) + if err == nil { + level.Info(ss.logger).Log("msg", "deleted user sync directory", "dir", userSyncDir) + } else { + level.Warn(ss.logger).Log("msg", "failed to delete user sync directory", "dir", userSyncDir, "err", err) + } + } +} + +// closeBucketStore closes bucket store for given user +// and removes it from bucket stores map and metrics. +// If bucket store doesn't exist, returns errBucketStoreNotFound. +// Otherwise returns error from closing the bucket store. +func (ss *ParquetBucketStores) closeBucketStore(userID string) error { + ss.storesMu.Lock() + unlockInDefer := true + defer func() { + if unlockInDefer { + ss.storesMu.Unlock() + } + }() + + bs := ss.stores[userID] + if bs == nil { + return errBucketStoreNotFound + } + + delete(ss.stores, userID) + unlockInDefer = false + ss.storesMu.Unlock() + + ss.metaFetcherMetrics.RemoveUserRegistry(userID) + return bs.RemoveBlocksAndClose() +} + +// countBlocksLoaded returns the total number of blocks loaded, summed for all users. +func (ss *ParquetBucketStores) countBlocksLoaded() int { + total := 0 + + ss.storesMu.RLock() + defer ss.storesMu.RUnlock() + + for _, store := range ss.stores { + stats := store.Stats() + total += stats.BlocksLoadedTotal + } + + return total +} + +func (ss *ParquetBucketStores) Describe(descs chan<- *prometheus.Desc) { + descs <- ss.blocksLoaded +} + +func (ss *ParquetBucketStores) Collect(metrics chan<- prometheus.Metric) { + total := ss.countBlocksLoaded() + metrics <- prometheus.MustNewConstMetric(ss.blocksLoaded, prometheus.GaugeValue, float64(total)) +} + +func (ss *ParquetBucketStores) syncDirForUser(userID string) string { + return filepath.Join(ss.cfg.BucketStore.SyncDir, userID) +}