diff --git a/pkg/storage/parquet/block/lazy_reader.go b/pkg/storage/parquet/block/lazy_reader.go index 5e22816ce12..55c4b1db69d 100644 --- a/pkg/storage/parquet/block/lazy_reader.go +++ b/pkg/storage/parquet/block/lazy_reader.go @@ -10,6 +10,7 @@ import ( "fmt" "os" "path/filepath" + "strings" "sync" "time" @@ -103,6 +104,9 @@ type LazyReaderLocalLabelsBucketChunks struct { blockID ulid.ULID shardIdx int + // earlyValidation indicates whether the labels file should be validated as + // soon as it is downloaded to local disk. + earlyValidation bool // bkt to download the labels file to local disk and open the chunks file from the bucket bkt objstore.InstrumentedBucketReader @@ -135,14 +139,16 @@ func NewLazyReaderLocalLabelsBucketChunks( metrics *LazyParquetReaderMetrics, onClosed func(*LazyReaderLocalLabelsBucketChunks), lazyLoadingGate gate.Gate, + earlyValidation bool, logger log.Logger, ) (*LazyReaderLocalLabelsBucketChunks, error) { reader := &LazyReaderLocalLabelsBucketChunks{ - ctx: ctx, - blockID: blockID, - shardIdx: FirstShardIndex, - bkt: bkt, - localDir: localDir, + ctx: ctx, + blockID: blockID, + shardIdx: FirstShardIndex, + earlyValidation: earlyValidation, + bkt: bkt, + localDir: localDir, metrics: metrics, usedAt: atomic.NewInt64(0), @@ -175,7 +181,8 @@ func (r *LazyReaderLocalLabelsBucketChunks) labelsFileName() string { } func (r *LazyReaderLocalLabelsBucketChunks) labelsFileLocalPath() string { - return filepath.Join(r.localDir, r.labelsFileName()) + // trim labels filename prefix before joining as it includes the block ID already + return filepath.Join(r.localDir, strings.TrimPrefix(r.labelsFileName(), r.blockID.String())) } func (r *LazyReaderLocalLabelsBucketChunks) ensureLabelsFileToLocalDisk() error { @@ -189,10 +196,16 @@ func (r *LazyReaderLocalLabelsBucketChunks) ensureLabelsFileToLocalDisk() error return errors.Wrap(err, "read parquet labels file from disk") } - level.Debug(r.logger).Log("msg", "parquet labels file not on disk; loading", "path", localPath) + level.Debug(r.logger).Log("msg", "parquet labels file not on disk; loading", "path", localPath, "validating", r.earlyValidation) start := time.Now() - if err := r.convertLabelsFileToLocalDisk(); err != nil { - return errors.Wrap(err, "write labels file") + if r.earlyValidation { + if err := r.convertLabelsFileToLocalDisk(); err != nil { + return errors.Wrap(err, "download and validate labels file") + } + } else { + if err := r.downloadLabelsFileToLocalDisk(); err != nil { + return errors.Wrap(err, "download labels file") + } } level.Debug(r.logger).Log("msg", "loaded parquet labels file to disk", "path", localPath, "elapsed", time.Since(start)) return nil @@ -235,6 +248,35 @@ func (r *LazyReaderLocalLabelsBucketChunks) convertLabelsFileToLocalDisk() error return nil } +// downloadLabelsFileToLocalDisk downloads the labels file from the bucket to +// local disk. It does not parse or validate the file in any way, it simply +// downloads it as-is. +func (r *LazyReaderLocalLabelsBucketChunks) downloadLabelsFileToLocalDisk() error { + reader, err := r.bkt.Get(r.ctx, r.labelsFileName()) + if err != nil { + return errors.Wrap(err, "download parquet labels file from bucket") + } + defer runutil.CloseWithLogOnErr(r.logger, reader, "close bucket labels reader") + + outPath := r.labelsFileLocalPath() + outPathDir := filepath.Dir(outPath) + err = os.MkdirAll(outPathDir, os.ModePerm) + if err != nil { + return errors.Wrap(err, "create local directory") + } + + f, err := os.Create(outPath) + if err != nil { + return errors.Wrap(err, "create local file and open for write") + } + defer runutil.CloseWithLogOnErr(r.logger, f, "close local parquet labels file") + + if _, err := f.ReadFrom(reader); err != nil { + return errors.Wrap(err, "read parquet labels file from bucket to local file") + } + return nil +} + func (r *LazyReaderLocalLabelsBucketChunks) LabelsFile() *storage.ParquetFile { loaded := r.getOrLoadReader(r.ctx) return loaded.reader.LabelsFile() diff --git a/pkg/storage/parquet/block/reader_pool.go b/pkg/storage/parquet/block/reader_pool.go index 378219f35dc..a80e87ed940 100644 --- a/pkg/storage/parquet/block/reader_pool.go +++ b/pkg/storage/parquet/block/reader_pool.go @@ -43,10 +43,11 @@ func NewReaderPoolMetrics(reg prometheus.Registerer) *ReaderPoolMetrics { type ReaderPool struct { services.Service - lazyReaderEnabled bool - lazyReaderIdleTimeout time.Duration - logger log.Logger - metrics *ReaderPoolMetrics + lazyReaderEnabled bool + earlyValidationEnabled bool + lazyReaderIdleTimeout time.Duration + logger log.Logger + metrics *ReaderPoolMetrics // Gate used to limit the number of concurrent index-header loads. lazyLoadingGate gate.Gate @@ -81,12 +82,13 @@ func newReaderPool( metrics *ReaderPoolMetrics, ) *ReaderPool { return &ReaderPool{ - logger: logger, - metrics: metrics, - lazyReaderEnabled: indexHeaderConfig.LazyLoadingEnabled, - lazyReaderIdleTimeout: indexHeaderConfig.LazyLoadingIdleTimeout, - lazyReaders: make(map[*LazyReaderLocalLabelsBucketChunks]struct{}), - lazyLoadingGate: lazyLoadingGate, + logger: logger, + metrics: metrics, + lazyReaderEnabled: indexHeaderConfig.LazyLoadingEnabled, + earlyValidationEnabled: false, + lazyReaderIdleTimeout: indexHeaderConfig.LazyLoadingIdleTimeout, + lazyReaders: make(map[*LazyReaderLocalLabelsBucketChunks]struct{}), + lazyLoadingGate: lazyLoadingGate, } } @@ -111,6 +113,7 @@ func (p *ReaderPool) GetReader( p.metrics.lazyReader, p.onLazyReaderClosed, p.lazyLoadingGate, + p.earlyValidationEnabled, logger, )