diff --git a/pkg/storegateway/parquet_bucket_stores.go b/pkg/storegateway/parquet_bucket_stores.go index 85415a5f1a9..30970de0021 100644 --- a/pkg/storegateway/parquet_bucket_stores.go +++ b/pkg/storegateway/parquet_bucket_stores.go @@ -4,13 +4,24 @@ package storegateway import ( "context" + "time" "github.com/go-kit/log" + "github.com/gogo/protobuf/types" "github.com/grafana/dskit/services" + "github.com/oklog/ulid/v2" + "github.com/pkg/errors" + "github.com/prometheus-community/parquet-common/storage" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/model/labels" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "github.com/grafana/mimir/pkg/storage/sharding" + "github.com/grafana/mimir/pkg/storegateway/hintspb" "github.com/grafana/mimir/pkg/storegateway/storegatewaypb" "github.com/grafana/mimir/pkg/storegateway/storepb" + "github.com/grafana/mimir/pkg/util/spanlogger" ) type ParquetBucketStores struct { @@ -18,6 +29,10 @@ type ParquetBucketStores struct { logger log.Logger reg prometheus.Registerer + + metrics *BucketStoreMetrics // TODO: Create ParquetBucketStoreMetrics + chunksLimiterFactory ChunksLimiterFactory + seriesLimiterFactory SeriesLimiterFactory } // NewParquetBucketStores initializes a Parquet implementation of the Stores interface. @@ -35,27 +50,259 @@ func NewParquetBucketStores( return stores, nil } -func (ss ParquetBucketStores) Series(req *storepb.SeriesRequest, srv storegatewaypb.StoreGateway_SeriesServer) error { - //TODO implement me - panic("implement me") +func (ss ParquetBucketStores) Series(req *storepb.SeriesRequest, srv storegatewaypb.StoreGateway_SeriesServer) (err error) { + if req.SkipChunks { + // We don't do the streaming call if we are not requesting the chunks. + req.StreamingChunksBatchSize = 0 + } + defer func() { err = mapSeriesError(err) }() + + matchers, err := storepb.MatchersToPromMatchers(req.Matchers...) + if err != nil { + return status.Error(codes.InvalidArgument, err.Error()) + } + + // Check if matchers include the query shard selector. + shardSelector, matchers, err := sharding.RemoveShardFromMatchers(matchers) + if err != nil { + return status.Error(codes.InvalidArgument, errors.Wrap(err, "parse query sharding label").Error()) + } + + var ( + spanLogger = spanlogger.FromContext(srv.Context(), ss.logger) + ctx = srv.Context() + stats = newSafeQueryStats() + reqBlockMatchers []*labels.Matcher + ) + defer ss.recordSeriesCallResult(stats) + defer ss.recordRequestAmbientTime(stats, time.Now()) + + if req.Hints != nil { + reqHints := &hintspb.SeriesRequestHints{} + if err := types.UnmarshalAny(req.Hints, reqHints); err != nil { + return status.Error(codes.InvalidArgument, errors.Wrap(err, "unmarshal series request hints").Error()) + } + + reqBlockMatchers, err = storepb.MatchersToPromMatchers(reqHints.BlockMatchers...) + if err != nil { + return status.Error(codes.InvalidArgument, errors.Wrap(err, "translate request hints labels matchers").Error()) + } + } + + logSeriesRequestToSpan(srv.Context(), ss.logger, req.MinTime, req.MaxTime, matchers, reqBlockMatchers, shardSelector, req.StreamingChunksBatchSize) + + shards := ss.openParquetShardsForReading(ctx, req.SkipChunks, req.MinTime, req.MaxTime, reqBlockMatchers, stats) + defer ss.closeParquetShards(shards) + + // Wait for the query gate only after opening blocks. Opening blocks is usually fast (~1ms), + // but sometimes it can take minutes if the block isn't loaded and there is a surge in queries for unloaded blocks. + done, err := ss.limitConcurrentQueries(ctx, stats) + if err != nil { + return err + } + defer done() + + var ( + resHints = &hintspb.SeriesResponseHints{} + ) + for _, shard := range shards { + resHints.AddQueriedBlock(shard.BlockID) + shard.MarkQueried() + } + if err := ss.sendHints(srv, resHints); err != nil { + return err + } + + streamingSeriesCount := 0 + if req.StreamingChunksBatchSize > 0 { + var ( + seriesSet storepb.SeriesSet + seriesLoadStart = time.Now() + chunksLimiter = ss.chunksLimiterFactory(ss.metrics.queriesDropped.WithLabelValues("chunks")) + seriesLimiter = ss.seriesLimiterFactory(ss.metrics.queriesDropped.WithLabelValues("series")) + ) + + // Placeholder: Create series set for streaming labels from parquet shards + seriesSet, err = ss.createParquetSeriesSetForLabels(ctx, req, shards, shardSelector, matchers, chunksLimiter, seriesLimiter, stats) + if err != nil { + return err + } + + streamingSeriesCount, err = ss.sendStreamingSeriesLabelsAndStats(req, srv, stats, seriesSet) + if err != nil { + return err + } + spanLogger.DebugLog( + "msg", "sent streaming series", + "num_series", streamingSeriesCount, + "duration", time.Since(seriesLoadStart), + ) + + if streamingSeriesCount == 0 { + // There is no series to send chunks for. + return nil + } + } + + // We create the limiter twice in the case of streaming so that we don't double count the series + // and hit the limit prematurely. + chunksLimiter := ss.chunksLimiterFactory(ss.metrics.queriesDropped.WithLabelValues("chunks")) + seriesLimiter := ss.seriesLimiterFactory(ss.metrics.queriesDropped.WithLabelValues("series")) + + start := time.Now() + if req.StreamingChunksBatchSize > 0 { + seriesChunkIt := ss.createParquetSeriesChunksSetIterator(ctx, req, shards, shardSelector, matchers, chunksLimiter, seriesLimiter, stats) + err = ss.sendStreamingChunks(req, srv, seriesChunkIt, stats, streamingSeriesCount) + } else { + var seriesSet storepb.SeriesSet + seriesSet, err = ss.createParquetSeriesSetWithChunks(ctx, req, shards, shardSelector, matchers, chunksLimiter, seriesLimiter, stats) + if err != nil { + return err + } + err = ss.sendSeriesChunks(req, srv, seriesSet, stats) + } + if err != nil { + return + } + + numSeries, numChunks := stats.seriesAndChunksCount() + debugMessage := "sent series" + if req.StreamingChunksBatchSize > 0 { + debugMessage = "sent streaming chunks" + } + spanLogger.DebugLog( + "msg", debugMessage, + "num_series", numSeries, + "num_chunks", numChunks, + "duration", time.Since(start), + ) + + if req.StreamingChunksBatchSize == 0 { + // Stats were not sent before, so send it now. + return ss.sendStats(srv, stats) + } + + return nil } func (ss ParquetBucketStores) LabelNames(ctx context.Context, req *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) { - //TODO implement me + // TODO implement me panic("implement me") } func (ss ParquetBucketStores) LabelValues(ctx context.Context, req *storepb.LabelValuesRequest) (*storepb.LabelValuesResponse, error) { - //TODO implement me + // TODO implement me panic("implement me") } func (ss ParquetBucketStores) SyncBlocks(ctx context.Context) error { - //TODO implement me + // TODO implement me panic("implement me") } func (ss ParquetBucketStores) scanUsers(ctx context.Context) ([]string, error) { - //TODO implement me + // TODO implement me panic("implement me") } + +type parquetShardWithMetadata struct { + storage.ParquetShardOpener + BlockID ulid.ULID + queried bool +} + +func (ps *parquetShardWithMetadata) MarkQueried() { + ps.queried = true +} + +// Placeholder methods for parquet-specific functionality +func (ss *ParquetBucketStores) openParquetShardsForReading(ctx context.Context, skipChunks bool, minTime, maxTime int64, reqBlockMatchers []*labels.Matcher, stats *safeQueryStats) []*parquetShardWithMetadata { + // TODO: Implement parquet shard discovery and opening logic + // This should: + // 1. Discover parquet shards that intersect with the time range + // 2. Use storage.ParquetShardOpener to open .labels.parquet and .chunks.parquet files + // 3. Read parquet schemas and metadata for efficient querying using shard.TSDBSchema() + // 4. Wrap opened ParquetShard with metadata (BlockID, queried status) + panic("TODO: implement openParquetShardsForReading") +} + +func (ss *ParquetBucketStores) closeParquetShards(shards []*parquetShardWithMetadata) { + for _, shard := range shards { + if shard == nil { + continue + } + if err := shard.Close(); err != nil { + ss.logger.Log("msg", "failed to close parquet shard", "block_id", shard.BlockID, "err", err) + } + } + // TODO: Implement parquet shard cleanup + // Close any open parquet file handles and release resources + panic("TODO: implement closeParquetShards") +} + +func (ss *ParquetBucketStores) limitConcurrentQueries(ctx context.Context, stats *safeQueryStats) (func(), error) { + // TODO: Can potentially reuse BucketStore.limitConcurrentQueries + // or implement parquet-specific version if needed + panic("TODO: implement limitConcurrentQueries") +} + +func (ss *ParquetBucketStores) sendHints(srv storegatewaypb.StoreGateway_SeriesServer, resHints *hintspb.SeriesResponseHints) error { + // TODO: Implement hints sending for parquet stores + panic("TODO: implement sendHints") +} + +func (ss *ParquetBucketStores) createParquetSeriesSetForLabels(ctx context.Context, req *storepb.SeriesRequest, shards []*parquetShardWithMetadata, shardSelector *sharding.ShardSelector, matchers []*labels.Matcher, chunksLimiter ChunksLimiter, seriesLimiter SeriesLimiter, stats *safeQueryStats) (storepb.SeriesSet, error) { + // TODO: Implement parquet series set creation for labels phase + // This should: + // 1. "Stream read" .labels.parquet files from shards using shard.LabelsFile() + // 2. Create and return storepb.SeriesSet that iterates over series labels without chunks + // Please note that storepb.SeriesSet assumes series are ordered. + panic("TODO: implement createParquetSeriesSetForLabels") +} + +func (ss *ParquetBucketStores) sendStreamingSeriesLabelsAndStats(req *storepb.SeriesRequest, srv storegatewaypb.StoreGateway_SeriesServer, stats *safeQueryStats, seriesSet storepb.SeriesSet) (int, error) { + // TODO: Can potentially reuse BucketStore.sendStreamingSeriesLabelsAndStats + // or implement parquet-specific version if needed + panic("TODO: implement sendStreamingSeriesLabelsAndStats") +} + +func (ss *ParquetBucketStores) createParquetSeriesChunksSetIterator(ctx context.Context, req *storepb.SeriesRequest, shards []*parquetShardWithMetadata, shardSelector *sharding.ShardSelector, matchers []*labels.Matcher, chunksLimiter ChunksLimiter, seriesLimiter SeriesLimiter, stats *safeQueryStats) iterator[seriesChunksSet] { + // TODO: Implement parquet series chunks iterator creation + // This should: + // 1. Stream read .chunks.parquet files from shards using shard.ChunksFile() + // 2. Return iterator[seriesChunksSet] / or the new iterator Nico is workisng on in his PR that streams chunks for the series discovered in labels phase + panic("TODO: implement createParquetSeriesChunksSetIterator") +} + +func (ss *ParquetBucketStores) sendStreamingChunks(req *storepb.SeriesRequest, srv storegatewaypb.StoreGateway_SeriesServer, seriesChunkIt iterator[seriesChunksSet], stats *safeQueryStats, streamingSeriesCount int) error { + // TODO: Can potentially reuse BucketStore.sendStreamingChunks + // or implement parquet-specific version if needed + panic("TODO: implement sendStreamingChunks") +} + +func (ss *ParquetBucketStores) createParquetSeriesSetWithChunks(ctx context.Context, req *storepb.SeriesRequest, shards []*parquetShardWithMetadata, shardSelector *sharding.ShardSelector, matchers []*labels.Matcher, chunksLimiter ChunksLimiter, seriesLimiter SeriesLimiter, stats *safeQueryStats) (storepb.SeriesSet, error) { + // TODO: Implement parquet series set creation for non-streaming request + // I think this should create a series that includes the labels in one go and its typically called when skipchunks is true + panic("TODO: implement createParquetSeriesSetWithChunks") +} + +func (ss *ParquetBucketStores) sendSeriesChunks(req *storepb.SeriesRequest, srv storegatewaypb.StoreGateway_SeriesServer, seriesSet storepb.SeriesSet, stats *safeQueryStats) error { + // TODO: Can potentially reuse BucketStore.sendSeriesChunks + // or implement parquet-specific version if needed + panic("TODO: implement sendSeriesChunks") +} + +func (ss *ParquetBucketStores) sendStats(srv storegatewaypb.StoreGateway_SeriesServer, stats *safeQueryStats) error { + // TODO: Implement stats sending for parquet stores + panic("TODO: implement sendStats") +} + +func (ss *ParquetBucketStores) recordSeriesCallResult(stats *safeQueryStats) { + // TODO: Implement series call result recording for parquet stores + panic("TODO: implement recordSeriesCallResult") +} + +func (ss *ParquetBucketStores) recordRequestAmbientTime(stats *safeQueryStats, startTime time.Time) { + // TODO: Implement request ambient time recording for parquet stores + panic("TODO: implement recordRequestAmbientTime") +} diff --git a/vendor/github.com/prometheus-community/parquet-common/storage/parquet_shard.go b/vendor/github.com/prometheus-community/parquet-common/storage/parquet_shard.go new file mode 100644 index 00000000000..0739ba5a7b7 --- /dev/null +++ b/vendor/github.com/prometheus-community/parquet-common/storage/parquet_shard.go @@ -0,0 +1,229 @@ +// Copyright The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "context" + "os" + "sync" + + "github.com/hashicorp/go-multierror" + "github.com/parquet-go/parquet-go" + "github.com/thanos-io/objstore" + "golang.org/x/sync/errgroup" + + "github.com/prometheus-community/parquet-common/schema" +) + +var DefaultShardOptions = shardOptions{ + optimisticReader: true, +} + +type shardOptions struct { + fileOptions []parquet.FileOption + optimisticReader bool +} + +type ParquetFile struct { + *parquet.File + ReadAtWithContextCloser + BloomFiltersLoaded bool + + optimisticReader bool +} + +type ShardOption func(*shardOptions) + +func WithFileOptions(fileOptions ...parquet.FileOption) ShardOption { + return func(opts *shardOptions) { + opts.fileOptions = append(opts.fileOptions, fileOptions...) + } +} + +func WithOptimisticReader(optimisticReader bool) ShardOption { + return func(opts *shardOptions) { + opts.optimisticReader = optimisticReader + } +} + +func (f *ParquetFile) GetPages(ctx context.Context, cc parquet.ColumnChunk, pagesToRead ...int) (*parquet.FilePages, error) { + colChunk := cc.(*parquet.FileColumnChunk) + reader := f.WithContext(ctx) + + if len(pagesToRead) > 0 && f.optimisticReader { + offset, err := cc.OffsetIndex() + if err != nil { + return nil, err + } + minOffset := offset.Offset(pagesToRead[0]) + maxOffset := offset.Offset(pagesToRead[len(pagesToRead)-1]) + offset.CompressedPageSize(pagesToRead[len(pagesToRead)-1]) + reader = newOptimisticReaderAt(reader, minOffset, maxOffset) + } + + pages := colChunk.PagesFrom(reader) + return pages, nil +} + +func Open(ctx context.Context, r ReadAtWithContextCloser, size int64, opts ...ShardOption) (*ParquetFile, error) { + cfg := DefaultShardOptions + + for _, opt := range opts { + opt(&cfg) + } + + c, err := parquet.NewFileConfig(cfg.fileOptions...) + if err != nil { + return nil, err + } + + file, err := parquet.OpenFile(r.WithContext(ctx), size, cfg.fileOptions...) + if err != nil { + return nil, err + } + + return &ParquetFile{ + File: file, + ReadAtWithContextCloser: r, + BloomFiltersLoaded: !c.SkipBloomFilters, + optimisticReader: cfg.optimisticReader, + }, nil +} + +func OpenFromBucket(ctx context.Context, bkt objstore.BucketReader, name string, opts ...ShardOption) (*ParquetFile, error) { + attr, err := bkt.Attributes(ctx, name) + if err != nil { + return nil, err + } + + r := NewBucketReadAt(name, bkt) + return Open(ctx, r, attr.Size, opts...) +} + +func OpenFromFile(ctx context.Context, path string, opts ...ShardOption) (*ParquetFile, error) { + f, err := os.Open(path) + if err != nil { + return nil, err + } + stat, err := f.Stat() + if err != nil { + _ = f.Close() + return nil, err + } + r := NewFileReadAt(f) + pf, err := Open(ctx, r, stat.Size(), opts...) + if err != nil { + _ = r.Close() + return nil, err + } + // At this point, the file's lifecycle is managed by the ParquetFile + return pf, nil +} + +type ParquetShard interface { + LabelsFile() *ParquetFile + ChunksFile() *ParquetFile + TSDBSchema() (*schema.TSDBSchema, error) +} + +type ParquetOpener interface { + Open(ctx context.Context, path string, opts ...ShardOption) (*ParquetFile, error) +} + +type ParquetBucketOpener struct { + bkt objstore.BucketReader +} + +func NewParquetBucketOpener(bkt objstore.BucketReader) *ParquetBucketOpener { + return &ParquetBucketOpener{ + bkt: bkt, + } +} + +func (o *ParquetBucketOpener) Open(ctx context.Context, name string, opts ...ShardOption) (*ParquetFile, error) { + return OpenFromBucket(ctx, o.bkt, name, opts...) +} + +type ParquetLocalFileOpener struct{} + +func NewParquetLocalFileOpener() *ParquetLocalFileOpener { + return &ParquetLocalFileOpener{} +} + +func (o *ParquetLocalFileOpener) Open(ctx context.Context, name string, opts ...ShardOption) (*ParquetFile, error) { + return OpenFromFile(ctx, name, opts...) +} + +type ParquetShardOpener struct { + labelsFile, chunksFile *ParquetFile + schema *schema.TSDBSchema + o sync.Once +} + +func NewParquetShardOpener( + ctx context.Context, + name string, + labelsFileOpener ParquetOpener, + chunksFileOpener ParquetOpener, + shard int, + opts ...ShardOption, +) (*ParquetShardOpener, error) { + labelsFileName := schema.LabelsPfileNameForShard(name, shard) + chunksFileName := schema.ChunksPfileNameForShard(name, shard) + + errGroup := errgroup.Group{} + + var labelsFile, chunksFile *ParquetFile + + errGroup.Go(func() (err error) { + labelsFile, err = labelsFileOpener.Open(ctx, labelsFileName, opts...) + return err + }) + + errGroup.Go(func() (err error) { + chunksFile, err = chunksFileOpener.Open(ctx, chunksFileName, opts...) + return err + }) + + if err := errGroup.Wait(); err != nil { + return nil, err + } + + return &ParquetShardOpener{ + labelsFile: labelsFile, + chunksFile: chunksFile, + }, nil +} + +func (s *ParquetShardOpener) LabelsFile() *ParquetFile { + return s.labelsFile +} + +func (s *ParquetShardOpener) ChunksFile() *ParquetFile { + return s.chunksFile +} + +func (s *ParquetShardOpener) TSDBSchema() (*schema.TSDBSchema, error) { + var err error + s.o.Do(func() { + s.schema, err = schema.FromLabelsFile(s.labelsFile.File) + }) + return s.schema, err +} + +func (s *ParquetShardOpener) Close() error { + err := &multierror.Error{} + err = multierror.Append(err, s.labelsFile.Close()) + err = multierror.Append(err, s.chunksFile.Close()) + return err.ErrorOrNil() +} diff --git a/vendor/github.com/prometheus-community/parquet-common/storage/read_at.go b/vendor/github.com/prometheus-community/parquet-common/storage/read_at.go new file mode 100644 index 00000000000..de085b62a7c --- /dev/null +++ b/vendor/github.com/prometheus-community/parquet-common/storage/read_at.go @@ -0,0 +1,111 @@ +// Copyright The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "context" + "io" + "os" + + "github.com/thanos-io/objstore" +) + +type ReadAtWithContextCloser interface { + io.Closer + WithContext(ctx context.Context) io.ReaderAt +} + +type fileReadAt struct { + *os.File +} + +// NewFileReadAt returns a ReadAtCloserWithContext for reading from a local file. +func NewFileReadAt(f *os.File) ReadAtWithContextCloser { + return &fileReadAt{ + File: f, + } +} + +func (f *fileReadAt) WithContext(_ context.Context) io.ReaderAt { + return f.File +} + +type bReadAt struct { + path string + obj objstore.BucketReader +} + +// NewBucketReadAt returns a ReadAtWithContextCloser for reading from a bucket. +func NewBucketReadAt(path string, obj objstore.BucketReader) ReadAtWithContextCloser { + return &bReadAt{ + path: path, + obj: obj, + } +} + +func (b *bReadAt) WithContext(ctx context.Context) io.ReaderAt { + return readAtFunc{ + f: func(p []byte, off int64) (n int, err error) { + rc, err := b.obj.GetRange(ctx, b.path, off, int64(len(p))) + if err != nil { + return 0, err + } + defer func() { _ = rc.Close() }() + n, err = io.ReadFull(rc, p) + if err == io.EOF { + err = nil + } + return + }, + } +} + +func (b *bReadAt) Close() error { + return nil +} + +type readAtFunc struct { + f func([]byte, int64) (n int, err error) +} + +func (r readAtFunc) ReadAt(p []byte, off int64) (n int, err error) { + return r.f(p, off) +} + +type optimisticReaderAt struct { + r io.ReaderAt + b []byte + offset int64 +} + +func (b optimisticReaderAt) ReadAt(p []byte, off int64) (n int, err error) { + if off >= b.offset && off < b.offset+int64(len(b.b)) { + diff := off - b.offset + n := copy(p, b.b[diff:]) + return n, nil + } + + return b.r.ReadAt(p, off) +} + +func newOptimisticReaderAt(r io.ReaderAt, minOffset, maxOffset int64) io.ReaderAt { + if minOffset < maxOffset { + b := make([]byte, maxOffset-minOffset) + n, err := r.ReadAt(b, minOffset) + if err == nil { + return &optimisticReaderAt{r: r, b: b[:n], offset: minOffset} + } + } + return r +} diff --git a/vendor/modules.txt b/vendor/modules.txt index a458e14c896..7f8649c4991 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1143,6 +1143,7 @@ github.com/power-devops/perfstat ## explicit; go 1.23.4 github.com/prometheus-community/parquet-common/convert github.com/prometheus-community/parquet-common/schema +github.com/prometheus-community/parquet-common/storage github.com/prometheus-community/parquet-common/util # github.com/prometheus/alertmanager v0.28.1 => github.com/grafana/prometheus-alertmanager v0.25.1-0.20250604130045-92c8f6389b36 ## explicit; go 1.23.0