Skip to content

Commit 05aa833

Browse files
committed
skip blocks with out-of-order chunk during compaction
Signed-off-by: Yang Hu <[email protected]>
1 parent 83419bc commit 05aa833

File tree

10 files changed

+309
-82
lines changed

10 files changed

+309
-82
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
1313
### Added
1414
- [#4453](https://github.com/thanos-io/thanos/pull/4453) Tools: Add flag `--selector.relabel-config-file` / `--selector.relabel-config` / `--max-time` / `--min-time` to filter served blocks.
1515
- [#4482](https://github.com/thanos-io/thanos/pull/4482) COS: Add http_config for cos object store client.
16+
- [#4469](https://github.com/thanos-io/thanos/pull/4482) Add flag `compact.skip-block-with-out-of-order-chunks` to skip blocks with out-of-order chunks during compaction instead of halting
1617

1718
### Fixed
1819

cmd/thanos/compact.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -147,8 +147,8 @@ func newCompactMetrics(reg *prometheus.Registry, deleteDelay time.Duration) *com
147147
m.blocksMarked = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
148148
Name: "thanos_compact_blocks_marked_total",
149149
Help: "Total number of blocks marked in compactor.",
150-
}, []string{"marker"})
151-
m.blocksMarked.WithLabelValues(metadata.NoCompactMarkFilename)
150+
}, []string{"marker", "reason"})
151+
m.blocksMarked.WithLabelValues(metadata.NoCompactMarkFilename, metadata.OutOfOrderChunksNoCompactReason)
152152
m.blocksMarked.WithLabelValues(metadata.DeletionMarkFilename)
153153

154154
m.garbageCollectedBlocks = promauto.With(reg).NewCounter(prometheus.CounterOpts{
@@ -349,13 +349,15 @@ func runCompact(
349349
reg,
350350
compactMetrics.blocksMarked.WithLabelValues(metadata.DeletionMarkFilename),
351351
compactMetrics.garbageCollectedBlocks,
352+
compactMetrics.blocksMarked.WithLabelValues(metadata.NoCompactMarkFilename, metadata.OutOfOrderChunksNoCompactReason),
352353
metadata.HashFunc(conf.hashFunc),
354+
conf.skipBlockWithOutOfOrderChunks,
353355
)
354356
planner := compact.WithLargeTotalIndexSizeFilter(
355357
compact.NewPlanner(logger, levels, noCompactMarkerFilter),
356358
bkt,
357359
int64(conf.maxBlockIndexSize),
358-
compactMetrics.blocksMarked.WithLabelValues(metadata.DeletionMarkFilename),
360+
compactMetrics.blocksMarked.WithLabelValues(metadata.NoCompactMarkFilename, metadata.IndexSizeExceedingNoCompactReason),
359361
)
360362
blocksCleaner := compact.NewBlocksCleaner(logger, bkt, ignoreDeletionMarkFilter, deleteDelay, compactMetrics.blocksCleaned, compactMetrics.blockCleanupFailures)
361363
compactor, err := compact.NewBucketCompactor(
@@ -585,6 +587,7 @@ type compactConfig struct {
585587
hashFunc string
586588
enableVerticalCompaction bool
587589
dedupFunc string
590+
skipBlockWithOutOfOrderChunks bool
588591
}
589592

590593
func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) {
@@ -668,6 +671,9 @@ func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) {
668671
"Default is due to https://github.com/thanos-io/thanos/issues/1424, but it's overall recommended to keeps block size to some reasonable size.").
669672
Hidden().Default("64GB").BytesVar(&cc.maxBlockIndexSize)
670673

674+
cmd.Flag("compact.skip-block-with-out-of-order-chunks", "When set to true, mark blocks containing index with out-of-order chunks for no compact instead of halting the compaction").
675+
Hidden().Default("false").BoolVar(&cc.skipBlockWithOutOfOrderChunks)
676+
671677
cmd.Flag("hash-func", "Specify which hash function to use when calculating the hashes of produced files. If no function has been specified, it does not happen. This permits avoiding downloading some files twice albeit at some performance cost. Possible values are: \"\", \"SHA256\".").
672678
Default("").EnumVar(&cc.hashFunc, "SHA256", "")
673679

docs/components/compact.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,10 @@ Flags:
301301
happen at the end of an iteration.
302302
--compact.concurrency=1 Number of goroutines to use when compacting
303303
groups.
304+
--compact.skip-block-with-out-of-order-chunks=false
305+
Mark blocks containing index with out-of-order
306+
chunks for no compact instead of halting the
307+
compaction.
304308
--consistency-delay=30m Minimum age of fresh (non-compacted) blocks
305309
before they are being processed. Malformed
306310
blocks older than the maximum of

pkg/block/index.go

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -111,12 +111,9 @@ func (i HealthStats) Issue347OutsideChunksErr() error {
111111
return nil
112112
}
113113

114-
// CriticalErr returns error if stats indicates critical block issue, that might solved only by manual repair procedure.
115-
func (i HealthStats) CriticalErr() error {
116-
var errMsg []string
117-
118-
if i.OutOfOrderSeries > 0 {
119-
errMsg = append(errMsg, fmt.Sprintf(
114+
func (i HealthStats) OutOfOrderChunksErr() error {
115+
if i.OutOfOrderChunks > 0 {
116+
return errors.New(fmt.Sprintf(
120117
"%d/%d series have an average of %.3f out-of-order chunks: "+
121118
"%.3f of these are exact duplicates (in terms of data and time range)",
122119
i.OutOfOrderSeries,
@@ -125,6 +122,12 @@ func (i HealthStats) CriticalErr() error {
125122
float64(i.DuplicatedChunks)/float64(i.OutOfOrderChunks),
126123
))
127124
}
125+
return nil
126+
}
127+
128+
// CriticalErr returns error if stats indicates critical block issue, that might solved only by manual repair procedure.
129+
func (i HealthStats) CriticalErr() error {
130+
var errMsg []string
128131

129132
n := i.OutsideChunks - (i.CompleteOutsideChunks + i.Issue347OutsideChunks)
130133
if n > 0 {
@@ -158,6 +161,10 @@ func (i HealthStats) AnyErr() error {
158161
errMsg = append(errMsg, err.Error())
159162
}
160163

164+
if err := i.OutOfOrderChunksErr(); err != nil {
165+
errMsg = append(errMsg, err.Error())
166+
}
167+
161168
if len(errMsg) > 0 {
162169
return errors.New(strings.Join(errMsg, ", "))
163170
}

pkg/block/index_test.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package block
66
import (
77
"context"
88
"io/ioutil"
9+
"math"
910
"os"
1011
"path/filepath"
1112
"testing"
@@ -83,5 +84,18 @@ func TestRewrite(t *testing.T) {
8384
testutil.Ok(t, ir2.Series(p.At(), &lset, &chks))
8485
testutil.Equals(t, 1, len(chks))
8586
}
87+
}
88+
89+
func TestGatherIndexHealthStatsReturnsOutOfOrderChunksErr(t *testing.T) {
90+
blockDir, err := ioutil.TempDir("", "test-ooo-index")
91+
testutil.Ok(t, err)
8692

93+
err = testutil.PutOutOfOrderIndex(blockDir, 0, math.MaxInt64)
94+
testutil.Ok(t, err)
95+
96+
stats, err := GatherIndexHealthStats(log.NewLogfmtLogger(os.Stderr), blockDir+"/"+IndexFilename, 0, math.MaxInt64)
97+
98+
testutil.Ok(t, err)
99+
testutil.Equals(t, 1, stats.OutOfOrderChunks)
100+
testutil.NotOk(t, stats.OutOfOrderChunksErr())
87101
}

pkg/block/metadata/markers.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@ const (
6767
// IndexSizeExceedingNoCompactReason is a reason of index being too big (for example exceeding 64GB limit: https://github.com/thanos-io/thanos/issues/1424)
6868
// This reason can be ignored when vertical block sharding will be implemented.
6969
IndexSizeExceedingNoCompactReason = "index-size-exceeding"
70+
// OutOfOrderChunksNoCompactReason is a reason of to no compact block with index contains out of order chunk so that the compaction is not blocked.
71+
OutOfOrderChunksNoCompactReason = "block-index-out-of-order-chunk"
7072
)
7173

7274
// NoCompactMark marker stores reason of block being excluded from compaction if needed.

pkg/compact/compact.go

Lines changed: 101 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -229,18 +229,20 @@ func defaultGroupKey(res int64, lbls labels.Labels) string {
229229
// DefaultGrouper is the Thanos built-in grouper. It groups blocks based on downsample
230230
// resolution and block's labels.
231231
type DefaultGrouper struct {
232-
bkt objstore.Bucket
233-
logger log.Logger
234-
acceptMalformedIndex bool
235-
enableVerticalCompaction bool
236-
compactions *prometheus.CounterVec
237-
compactionRunsStarted *prometheus.CounterVec
238-
compactionRunsCompleted *prometheus.CounterVec
239-
compactionFailures *prometheus.CounterVec
240-
verticalCompactions *prometheus.CounterVec
241-
garbageCollectedBlocks prometheus.Counter
242-
blocksMarkedForDeletion prometheus.Counter
243-
hashFunc metadata.HashFunc
232+
bkt objstore.Bucket
233+
logger log.Logger
234+
acceptMalformedIndex bool
235+
enableVerticalCompaction bool
236+
compactions *prometheus.CounterVec
237+
compactionRunsStarted *prometheus.CounterVec
238+
compactionRunsCompleted *prometheus.CounterVec
239+
compactionFailures *prometheus.CounterVec
240+
verticalCompactions *prometheus.CounterVec
241+
garbageCollectedBlocks prometheus.Counter
242+
blocksMarkedForDeletion prometheus.Counter
243+
blocksMarkedForNoCompact prometheus.Counter
244+
hashFunc metadata.HashFunc
245+
skipChunksWithOutOfOrderBlocks bool
244246
}
245247

246248
// NewDefaultGrouper makes a new DefaultGrouper.
@@ -252,7 +254,9 @@ func NewDefaultGrouper(
252254
reg prometheus.Registerer,
253255
blocksMarkedForDeletion prometheus.Counter,
254256
garbageCollectedBlocks prometheus.Counter,
257+
blocksMarkedForNoCompact prometheus.Counter,
255258
hashFunc metadata.HashFunc,
259+
skipChunksWithOutOfOrderBlocks bool,
256260
) *DefaultGrouper {
257261
return &DefaultGrouper{
258262
bkt: bkt,
@@ -279,9 +283,11 @@ func NewDefaultGrouper(
279283
Name: "thanos_compact_group_vertical_compactions_total",
280284
Help: "Total number of group compaction attempts that resulted in a new block based on overlapping blocks.",
281285
}, []string{"group"}),
282-
garbageCollectedBlocks: garbageCollectedBlocks,
283-
blocksMarkedForDeletion: blocksMarkedForDeletion,
284-
hashFunc: hashFunc,
286+
blocksMarkedForNoCompact: blocksMarkedForNoCompact,
287+
garbageCollectedBlocks: garbageCollectedBlocks,
288+
blocksMarkedForDeletion: blocksMarkedForDeletion,
289+
hashFunc: hashFunc,
290+
skipChunksWithOutOfOrderBlocks: skipChunksWithOutOfOrderBlocks,
285291
}
286292
}
287293

@@ -309,7 +315,9 @@ func (g *DefaultGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (res []*Gro
309315
g.verticalCompactions.WithLabelValues(groupKey),
310316
g.garbageCollectedBlocks,
311317
g.blocksMarkedForDeletion,
318+
g.blocksMarkedForNoCompact,
312319
g.hashFunc,
320+
g.skipChunksWithOutOfOrderBlocks,
313321
)
314322
if err != nil {
315323
return nil, errors.Wrap(err, "create compaction group")
@@ -330,23 +338,25 @@ func (g *DefaultGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (res []*Gro
330338
// Group captures a set of blocks that have the same origin labels and downsampling resolution.
331339
// Those blocks generally contain the same series and can thus efficiently be compacted.
332340
type Group struct {
333-
logger log.Logger
334-
bkt objstore.Bucket
335-
key string
336-
labels labels.Labels
337-
resolution int64
338-
mtx sync.Mutex
339-
metasByMinTime []*metadata.Meta
340-
acceptMalformedIndex bool
341-
enableVerticalCompaction bool
342-
compactions prometheus.Counter
343-
compactionRunsStarted prometheus.Counter
344-
compactionRunsCompleted prometheus.Counter
345-
compactionFailures prometheus.Counter
346-
verticalCompactions prometheus.Counter
347-
groupGarbageCollectedBlocks prometheus.Counter
348-
blocksMarkedForDeletion prometheus.Counter
349-
hashFunc metadata.HashFunc
341+
logger log.Logger
342+
bkt objstore.Bucket
343+
key string
344+
labels labels.Labels
345+
resolution int64
346+
mtx sync.Mutex
347+
metasByMinTime []*metadata.Meta
348+
acceptMalformedIndex bool
349+
enableVerticalCompaction bool
350+
compactions prometheus.Counter
351+
compactionRunsStarted prometheus.Counter
352+
compactionRunsCompleted prometheus.Counter
353+
compactionFailures prometheus.Counter
354+
verticalCompactions prometheus.Counter
355+
groupGarbageCollectedBlocks prometheus.Counter
356+
blocksMarkedForDeletion prometheus.Counter
357+
blocksMarkedForNoCompact prometheus.Counter
358+
hashFunc metadata.HashFunc
359+
skipChunksWithOutofOrderBlocks bool
350360
}
351361

352362
// NewGroup returns a new compaction group.
@@ -365,27 +375,31 @@ func NewGroup(
365375
verticalCompactions prometheus.Counter,
366376
groupGarbageCollectedBlocks prometheus.Counter,
367377
blocksMarkedForDeletion prometheus.Counter,
378+
blockMakredForNoCopmact prometheus.Counter,
368379
hashFunc metadata.HashFunc,
380+
skipChunksWithOutOfOrderChunks bool,
369381
) (*Group, error) {
370382
if logger == nil {
371383
logger = log.NewNopLogger()
372384
}
373385
g := &Group{
374-
logger: logger,
375-
bkt: bkt,
376-
key: key,
377-
labels: lset,
378-
resolution: resolution,
379-
acceptMalformedIndex: acceptMalformedIndex,
380-
enableVerticalCompaction: enableVerticalCompaction,
381-
compactions: compactions,
382-
compactionRunsStarted: compactionRunsStarted,
383-
compactionRunsCompleted: compactionRunsCompleted,
384-
compactionFailures: compactionFailures,
385-
verticalCompactions: verticalCompactions,
386-
groupGarbageCollectedBlocks: groupGarbageCollectedBlocks,
387-
blocksMarkedForDeletion: blocksMarkedForDeletion,
388-
hashFunc: hashFunc,
386+
logger: logger,
387+
bkt: bkt,
388+
key: key,
389+
labels: lset,
390+
resolution: resolution,
391+
acceptMalformedIndex: acceptMalformedIndex,
392+
enableVerticalCompaction: enableVerticalCompaction,
393+
compactions: compactions,
394+
compactionRunsStarted: compactionRunsStarted,
395+
compactionRunsCompleted: compactionRunsCompleted,
396+
compactionFailures: compactionFailures,
397+
verticalCompactions: verticalCompactions,
398+
groupGarbageCollectedBlocks: groupGarbageCollectedBlocks,
399+
blocksMarkedForDeletion: blocksMarkedForDeletion,
400+
blocksMarkedForNoCompact: blockMakredForNoCopmact,
401+
hashFunc: hashFunc,
402+
skipChunksWithOutofOrderBlocks: skipChunksWithOutOfOrderChunks,
389403
}
390404
return g, nil
391405
}
@@ -541,6 +555,27 @@ func IsIssue347Error(err error) bool {
541555
return ok
542556
}
543557

558+
// OutOfOrderChunkError is a type wrapper for OOO chunk error from validating block index.
559+
type OutOfOrderChunksError struct {
560+
err error
561+
562+
id ulid.ULID
563+
}
564+
565+
func (e OutOfOrderChunksError) Error() string {
566+
return e.err.Error()
567+
}
568+
569+
func outOfOrderChunkError(err error, brokenBlock ulid.ULID) OutOfOrderChunksError {
570+
return OutOfOrderChunksError{err: err, id: brokenBlock}
571+
}
572+
573+
// IsOutOfOrderChunk returns true if the base error is a OutOfOrderChunkError.
574+
func IsOutOfOrderChunkError(err error) bool {
575+
_, ok := errors.Cause(err).(OutOfOrderChunksError)
576+
return ok
577+
}
578+
544579
// HaltError is a type wrapper for errors that should halt any further progress on compactions.
545580
type HaltError struct {
546581
err error
@@ -749,6 +784,10 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp
749784
return false, ulid.ULID{}, halt(errors.Wrapf(err, "block with not healthy index found %s; Compaction level %v; Labels: %v", bdir, meta.Compaction.Level, meta.Thanos.Labels))
750785
}
751786

787+
if err := stats.OutOfOrderChunksErr(); cg.skipChunksWithOutofOrderBlocks && err != nil {
788+
return false, ulid.ULID{}, outOfOrderChunkError(errors.Wrapf(err, "blocks with out-of-order chunks are dropped from compaction: %s", bdir), meta.ULID)
789+
}
790+
752791
if err := stats.Issue347OutsideChunksErr(); err != nil {
753792
return false, ulid.ULID{}, issue347Error(errors.Wrapf(err, "invalid, but reparable block %s", bdir), meta.ULID)
754793
}
@@ -939,6 +978,21 @@ func (c *BucketCompactor) Compact(ctx context.Context) (rerr error) {
939978
continue
940979
}
941980
}
981+
// if block has out of order chunk, mark the block for no copmaction and continue
982+
if IsOutOfOrderChunkError(err) {
983+
if err := block.MarkForNoCompact(
984+
ctx,
985+
c.logger,
986+
c.bkt,
987+
err.(OutOfOrderChunksError).id,
988+
metadata.OutOfOrderChunksNoCompactReason,
989+
"OutofOrderChunk: marking block with out-of-order series/chunks to as no compact to unblock compaction", g.blocksMarkedForNoCompact); err == nil {
990+
mtx.Lock()
991+
finishedAllGroups = false
992+
mtx.Unlock()
993+
continue
994+
}
995+
}
942996
errChan <- errors.Wrapf(err, "group %s", g.Key())
943997
return
944998
}

0 commit comments

Comments
 (0)