Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 51 additions & 9 deletions pkg/storage/parquet/block/lazy_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"fmt"
"os"
"path/filepath"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -103,6 +104,9 @@ type LazyReaderLocalLabelsBucketChunks struct {

blockID ulid.ULID
shardIdx int
// earlyValidation indicates whether the labels file should be validated as
// soon as it is downloaded to local disk.
earlyValidation bool

// bkt to download the labels file to local disk and open the chunks file from the bucket
bkt objstore.InstrumentedBucketReader
Expand Down Expand Up @@ -135,14 +139,16 @@ func NewLazyReaderLocalLabelsBucketChunks(
metrics *LazyParquetReaderMetrics,
onClosed func(*LazyReaderLocalLabelsBucketChunks),
lazyLoadingGate gate.Gate,
earlyValidation bool,
logger log.Logger,
) (*LazyReaderLocalLabelsBucketChunks, error) {
reader := &LazyReaderLocalLabelsBucketChunks{
ctx: ctx,
blockID: blockID,
shardIdx: FirstShardIndex,
bkt: bkt,
localDir: localDir,
ctx: ctx,
blockID: blockID,
shardIdx: FirstShardIndex,
earlyValidation: earlyValidation,
bkt: bkt,
localDir: localDir,

metrics: metrics,
usedAt: atomic.NewInt64(0),
Expand Down Expand Up @@ -175,7 +181,8 @@ func (r *LazyReaderLocalLabelsBucketChunks) labelsFileName() string {
}

func (r *LazyReaderLocalLabelsBucketChunks) labelsFileLocalPath() string {
return filepath.Join(r.localDir, r.labelsFileName())
// trim labels filename prefix before joining as it includes the block ID already
return filepath.Join(r.localDir, strings.TrimPrefix(r.labelsFileName(), r.blockID.String()))
}

func (r *LazyReaderLocalLabelsBucketChunks) ensureLabelsFileToLocalDisk() error {
Expand All @@ -189,10 +196,16 @@ func (r *LazyReaderLocalLabelsBucketChunks) ensureLabelsFileToLocalDisk() error
return errors.Wrap(err, "read parquet labels file from disk")
}

level.Debug(r.logger).Log("msg", "parquet labels file not on disk; loading", "path", localPath)
level.Debug(r.logger).Log("msg", "parquet labels file not on disk; loading", "path", localPath, "validating", r.earlyValidation)
start := time.Now()
if err := r.convertLabelsFileToLocalDisk(); err != nil {
return errors.Wrap(err, "write labels file")
if r.earlyValidation {
if err := r.convertLabelsFileToLocalDisk(); err != nil {
return errors.Wrap(err, "download and validate labels file")
}
} else {
if err := r.downloadLabelsFileToLocalDisk(); err != nil {
return errors.Wrap(err, "download labels file")
}
}
level.Debug(r.logger).Log("msg", "loaded parquet labels file to disk", "path", localPath, "elapsed", time.Since(start))
return nil
Expand Down Expand Up @@ -235,6 +248,35 @@ func (r *LazyReaderLocalLabelsBucketChunks) convertLabelsFileToLocalDisk() error
return nil
}

// downloadLabelsFileToLocalDisk downloads the labels file from the bucket to
// local disk. It does not parse or validate the file in any way, it simply
// downloads it as-is.
func (r *LazyReaderLocalLabelsBucketChunks) downloadLabelsFileToLocalDisk() error {
reader, err := r.bkt.Get(r.ctx, r.labelsFileName())
if err != nil {
return errors.Wrap(err, "download parquet labels file from bucket")
}
defer runutil.CloseWithLogOnErr(r.logger, reader, "close bucket labels reader")

outPath := r.labelsFileLocalPath()
outPathDir := filepath.Dir(outPath)
err = os.MkdirAll(outPathDir, os.ModePerm)
if err != nil {
return errors.Wrap(err, "create local directory")
}

f, err := os.Create(outPath)
if err != nil {
return errors.Wrap(err, "create local file and open for write")
}
defer runutil.CloseWithLogOnErr(r.logger, f, "close local parquet labels file")

if _, err := f.ReadFrom(reader); err != nil {
return errors.Wrap(err, "read parquet labels file from bucket to local file")
}
return nil
}

func (r *LazyReaderLocalLabelsBucketChunks) LabelsFile() *storage.ParquetFile {
loaded := r.getOrLoadReader(r.ctx)
return loaded.reader.LabelsFile()
Expand Down
23 changes: 13 additions & 10 deletions pkg/storage/parquet/block/reader_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,11 @@ func NewReaderPoolMetrics(reg prometheus.Registerer) *ReaderPoolMetrics {
type ReaderPool struct {
services.Service

lazyReaderEnabled bool
lazyReaderIdleTimeout time.Duration
logger log.Logger
metrics *ReaderPoolMetrics
lazyReaderEnabled bool
earlyValidationEnabled bool
lazyReaderIdleTimeout time.Duration
logger log.Logger
metrics *ReaderPoolMetrics

// Gate used to limit the number of concurrent index-header loads.
lazyLoadingGate gate.Gate
Expand Down Expand Up @@ -81,12 +82,13 @@ func newReaderPool(
metrics *ReaderPoolMetrics,
) *ReaderPool {
return &ReaderPool{
logger: logger,
metrics: metrics,
lazyReaderEnabled: indexHeaderConfig.LazyLoadingEnabled,
lazyReaderIdleTimeout: indexHeaderConfig.LazyLoadingIdleTimeout,
lazyReaders: make(map[*LazyReaderLocalLabelsBucketChunks]struct{}),
lazyLoadingGate: lazyLoadingGate,
logger: logger,
metrics: metrics,
lazyReaderEnabled: indexHeaderConfig.LazyLoadingEnabled,
earlyValidationEnabled: false,
lazyReaderIdleTimeout: indexHeaderConfig.LazyLoadingIdleTimeout,
lazyReaders: make(map[*LazyReaderLocalLabelsBucketChunks]struct{}),
lazyLoadingGate: lazyLoadingGate,
}
}

Expand All @@ -111,6 +113,7 @@ func (p *ReaderPool) GetReader(
p.metrics.lazyReader,
p.onLazyReaderClosed,
p.lazyLoadingGate,
p.earlyValidationEnabled,
logger,
)

Expand Down