Skip to content

Commit b0fbe2d

Browse files
committed
skip blocks with out-of-order chunk during compaction
Signed-off-by: Yang Hu <[email protected]>
1 parent ce66bbd commit b0fbe2d

File tree

9 files changed

+307
-85
lines changed

9 files changed

+307
-85
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
1717
- [#4444](https://github.com/thanos-io/thanos/pull/4444) UI: Add search block UI.
1818
- [#4509](https://github.com/thanos-io/thanos/pull/4509) Logging: Adds duration_ms in int64 to the logs.
1919
- [#4462](https://github.com/thanos-io/thanos/pull/4462) UI: Add find overlap block UI
20+
- [#4469](https://github.com/thanos-io/thanos/pull/4469) Compact: Add flag `compact.skip-block-with-out-of-order-chunks` to skip blocks with out-of-order chunks during compaction instead of halting
2021

2122
### Fixed
2223

cmd/thanos/compact.go

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -147,9 +147,10 @@ func newCompactMetrics(reg *prometheus.Registry, deleteDelay time.Duration) *com
147147
m.blocksMarked = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
148148
Name: "thanos_compact_blocks_marked_total",
149149
Help: "Total number of blocks marked in compactor.",
150-
}, []string{"marker"})
151-
m.blocksMarked.WithLabelValues(metadata.NoCompactMarkFilename)
152-
m.blocksMarked.WithLabelValues(metadata.DeletionMarkFilename)
150+
}, []string{"marker", "reason"})
151+
m.blocksMarked.WithLabelValues(metadata.NoCompactMarkFilename, metadata.OutOfOrderChunksNoCompactReason)
152+
m.blocksMarked.WithLabelValues(metadata.NoCompactMarkFilename, metadata.IndexSizeExceedingNoCompactReason)
153+
m.blocksMarked.WithLabelValues(metadata.DeletionMarkFilename, "")
153154

154155
m.garbageCollectedBlocks = promauto.With(reg).NewCounter(prometheus.CounterOpts{
155156
Name: "thanos_compact_garbage_collected_blocks_total",
@@ -281,7 +282,7 @@ func runCompact(
281282
cf,
282283
duplicateBlocksFilter,
283284
ignoreDeletionMarkFilter,
284-
compactMetrics.blocksMarked.WithLabelValues(metadata.DeletionMarkFilename),
285+
compactMetrics.blocksMarked.WithLabelValues(metadata.DeletionMarkFilename, ""),
285286
compactMetrics.garbageCollectedBlocks,
286287
conf.blockSyncConcurrency)
287288
if err != nil {
@@ -347,15 +348,17 @@ func runCompact(
347348
conf.acceptMalformedIndex,
348349
enableVerticalCompaction,
349350
reg,
350-
compactMetrics.blocksMarked.WithLabelValues(metadata.DeletionMarkFilename),
351+
compactMetrics.blocksMarked.WithLabelValues(metadata.DeletionMarkFilename, ""),
351352
compactMetrics.garbageCollectedBlocks,
353+
compactMetrics.blocksMarked.WithLabelValues(metadata.NoCompactMarkFilename, metadata.OutOfOrderChunksNoCompactReason),
352354
metadata.HashFunc(conf.hashFunc),
355+
conf.skipBlockWithOutOfOrderChunks,
353356
)
354357
planner := compact.WithLargeTotalIndexSizeFilter(
355358
compact.NewPlanner(logger, levels, noCompactMarkerFilter),
356359
bkt,
357360
int64(conf.maxBlockIndexSize),
358-
compactMetrics.blocksMarked.WithLabelValues(metadata.DeletionMarkFilename),
361+
compactMetrics.blocksMarked.WithLabelValues(metadata.NoCompactMarkFilename, metadata.IndexSizeExceedingNoCompactReason),
359362
)
360363
blocksCleaner := compact.NewBlocksCleaner(logger, bkt, ignoreDeletionMarkFilter, deleteDelay, compactMetrics.blocksCleaned, compactMetrics.blockCleanupFailures)
361364
compactor, err := compact.NewBucketCompactor(
@@ -585,6 +588,7 @@ type compactConfig struct {
585588
hashFunc string
586589
enableVerticalCompaction bool
587590
dedupFunc string
591+
skipBlockWithOutOfOrderChunks bool
588592
}
589593

590594
func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) {
@@ -668,6 +672,9 @@ func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) {
668672
"Default is due to https://github.com/thanos-io/thanos/issues/1424, but it's overall recommended to keeps block size to some reasonable size.").
669673
Hidden().Default("64GB").BytesVar(&cc.maxBlockIndexSize)
670674

675+
cmd.Flag("compact.skip-block-with-out-of-order-chunks", "When set to true, mark blocks containing index with out-of-order chunks for no compact instead of halting the compaction").
676+
Hidden().Default("false").BoolVar(&cc.skipBlockWithOutOfOrderChunks)
677+
671678
cmd.Flag("hash-func", "Specify which hash function to use when calculating the hashes of produced files. If no function has been specified, it does not happen. This permits avoiding downloading some files twice albeit at some performance cost. Possible values are: \"\", \"SHA256\".").
672679
Default("").EnumVar(&cc.hashFunc, "SHA256", "")
673680

pkg/block/index.go

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -111,12 +111,9 @@ func (i HealthStats) Issue347OutsideChunksErr() error {
111111
return nil
112112
}
113113

114-
// CriticalErr returns error if stats indicates critical block issue, that might solved only by manual repair procedure.
115-
func (i HealthStats) CriticalErr() error {
116-
var errMsg []string
117-
118-
if i.OutOfOrderSeries > 0 {
119-
errMsg = append(errMsg, fmt.Sprintf(
114+
func (i HealthStats) OutOfOrderChunksErr() error {
115+
if i.OutOfOrderChunks > 0 {
116+
return errors.New(fmt.Sprintf(
120117
"%d/%d series have an average of %.3f out-of-order chunks: "+
121118
"%.3f of these are exact duplicates (in terms of data and time range)",
122119
i.OutOfOrderSeries,
@@ -125,6 +122,12 @@ func (i HealthStats) CriticalErr() error {
125122
float64(i.DuplicatedChunks)/float64(i.OutOfOrderChunks),
126123
))
127124
}
125+
return nil
126+
}
127+
128+
// CriticalErr returns error if stats indicates critical block issue, that might solved only by manual repair procedure.
129+
func (i HealthStats) CriticalErr() error {
130+
var errMsg []string
128131

129132
n := i.OutsideChunks - (i.CompleteOutsideChunks + i.Issue347OutsideChunks)
130133
if n > 0 {
@@ -158,6 +161,10 @@ func (i HealthStats) AnyErr() error {
158161
errMsg = append(errMsg, err.Error())
159162
}
160163

164+
if err := i.OutOfOrderChunksErr(); err != nil {
165+
errMsg = append(errMsg, err.Error())
166+
}
167+
161168
if len(errMsg) > 0 {
162169
return errors.New(strings.Join(errMsg, ", "))
163170
}

pkg/block/index_test.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package block
66
import (
77
"context"
88
"io/ioutil"
9+
"math"
910
"os"
1011
"path/filepath"
1112
"testing"
@@ -83,5 +84,18 @@ func TestRewrite(t *testing.T) {
8384
testutil.Ok(t, ir2.Series(p.At(), &lset, &chks))
8485
testutil.Equals(t, 1, len(chks))
8586
}
87+
}
88+
89+
func TestGatherIndexHealthStatsReturnsOutOfOrderChunksErr(t *testing.T) {
90+
blockDir, err := ioutil.TempDir("", "test-ooo-index")
91+
testutil.Ok(t, err)
8692

93+
err = testutil.PutOutOfOrderIndex(blockDir, 0, math.MaxInt64)
94+
testutil.Ok(t, err)
95+
96+
stats, err := GatherIndexHealthStats(log.NewLogfmtLogger(os.Stderr), blockDir+"/"+IndexFilename, 0, math.MaxInt64)
97+
98+
testutil.Ok(t, err)
99+
testutil.Equals(t, 1, stats.OutOfOrderChunks)
100+
testutil.NotOk(t, stats.OutOfOrderChunksErr())
87101
}

pkg/block/metadata/markers.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@ const (
6767
// IndexSizeExceedingNoCompactReason is a reason of index being too big (for example exceeding 64GB limit: https://github.com/thanos-io/thanos/issues/1424)
6868
// This reason can be ignored when vertical block sharding will be implemented.
6969
IndexSizeExceedingNoCompactReason = "index-size-exceeding"
70+
// OutOfOrderChunksNoCompactReason is a reason of to no compact block with index contains out of order chunk so that the compaction is not blocked.
71+
OutOfOrderChunksNoCompactReason = "block-index-out-of-order-chunk"
7072
)
7173

7274
// NoCompactMark marker stores reason of block being excluded from compaction if needed.

pkg/compact/compact.go

Lines changed: 100 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -229,18 +229,20 @@ func defaultGroupKey(res int64, lbls labels.Labels) string {
229229
// DefaultGrouper is the Thanos built-in grouper. It groups blocks based on downsample
230230
// resolution and block's labels.
231231
type DefaultGrouper struct {
232-
bkt objstore.Bucket
233-
logger log.Logger
234-
acceptMalformedIndex bool
235-
enableVerticalCompaction bool
236-
compactions *prometheus.CounterVec
237-
compactionRunsStarted *prometheus.CounterVec
238-
compactionRunsCompleted *prometheus.CounterVec
239-
compactionFailures *prometheus.CounterVec
240-
verticalCompactions *prometheus.CounterVec
241-
garbageCollectedBlocks prometheus.Counter
242-
blocksMarkedForDeletion prometheus.Counter
243-
hashFunc metadata.HashFunc
232+
bkt objstore.Bucket
233+
logger log.Logger
234+
acceptMalformedIndex bool
235+
enableVerticalCompaction bool
236+
compactions *prometheus.CounterVec
237+
compactionRunsStarted *prometheus.CounterVec
238+
compactionRunsCompleted *prometheus.CounterVec
239+
compactionFailures *prometheus.CounterVec
240+
verticalCompactions *prometheus.CounterVec
241+
garbageCollectedBlocks prometheus.Counter
242+
blocksMarkedForDeletion prometheus.Counter
243+
blocksMarkedForNoCompact prometheus.Counter
244+
hashFunc metadata.HashFunc
245+
skipChunksWithOutOfOrderBlocks bool
244246
}
245247

246248
// NewDefaultGrouper makes a new DefaultGrouper.
@@ -252,7 +254,9 @@ func NewDefaultGrouper(
252254
reg prometheus.Registerer,
253255
blocksMarkedForDeletion prometheus.Counter,
254256
garbageCollectedBlocks prometheus.Counter,
257+
blocksMarkedForNoCompact prometheus.Counter,
255258
hashFunc metadata.HashFunc,
259+
skipChunksWithOutOfOrderBlocks bool,
256260
) *DefaultGrouper {
257261
return &DefaultGrouper{
258262
bkt: bkt,
@@ -279,9 +283,11 @@ func NewDefaultGrouper(
279283
Name: "thanos_compact_group_vertical_compactions_total",
280284
Help: "Total number of group compaction attempts that resulted in a new block based on overlapping blocks.",
281285
}, []string{"group"}),
282-
garbageCollectedBlocks: garbageCollectedBlocks,
283-
blocksMarkedForDeletion: blocksMarkedForDeletion,
284-
hashFunc: hashFunc,
286+
blocksMarkedForNoCompact: blocksMarkedForNoCompact,
287+
garbageCollectedBlocks: garbageCollectedBlocks,
288+
blocksMarkedForDeletion: blocksMarkedForDeletion,
289+
hashFunc: hashFunc,
290+
skipChunksWithOutOfOrderBlocks: skipChunksWithOutOfOrderBlocks,
285291
}
286292
}
287293

@@ -309,7 +315,9 @@ func (g *DefaultGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (res []*Gro
309315
g.verticalCompactions.WithLabelValues(groupKey),
310316
g.garbageCollectedBlocks,
311317
g.blocksMarkedForDeletion,
318+
g.blocksMarkedForNoCompact,
312319
g.hashFunc,
320+
g.skipChunksWithOutOfOrderBlocks,
313321
)
314322
if err != nil {
315323
return nil, errors.Wrap(err, "create compaction group")
@@ -330,23 +338,25 @@ func (g *DefaultGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (res []*Gro
330338
// Group captures a set of blocks that have the same origin labels and downsampling resolution.
331339
// Those blocks generally contain the same series and can thus efficiently be compacted.
332340
type Group struct {
333-
logger log.Logger
334-
bkt objstore.Bucket
335-
key string
336-
labels labels.Labels
337-
resolution int64
338-
mtx sync.Mutex
339-
metasByMinTime []*metadata.Meta
340-
acceptMalformedIndex bool
341-
enableVerticalCompaction bool
342-
compactions prometheus.Counter
343-
compactionRunsStarted prometheus.Counter
344-
compactionRunsCompleted prometheus.Counter
345-
compactionFailures prometheus.Counter
346-
verticalCompactions prometheus.Counter
347-
groupGarbageCollectedBlocks prometheus.Counter
348-
blocksMarkedForDeletion prometheus.Counter
349-
hashFunc metadata.HashFunc
341+
logger log.Logger
342+
bkt objstore.Bucket
343+
key string
344+
labels labels.Labels
345+
resolution int64
346+
mtx sync.Mutex
347+
metasByMinTime []*metadata.Meta
348+
acceptMalformedIndex bool
349+
enableVerticalCompaction bool
350+
compactions prometheus.Counter
351+
compactionRunsStarted prometheus.Counter
352+
compactionRunsCompleted prometheus.Counter
353+
compactionFailures prometheus.Counter
354+
verticalCompactions prometheus.Counter
355+
groupGarbageCollectedBlocks prometheus.Counter
356+
blocksMarkedForDeletion prometheus.Counter
357+
blocksMarkedForNoCompact prometheus.Counter
358+
hashFunc metadata.HashFunc
359+
skipChunksWithOutofOrderBlocks bool
350360
}
351361

352362
// NewGroup returns a new compaction group.
@@ -365,27 +375,31 @@ func NewGroup(
365375
verticalCompactions prometheus.Counter,
366376
groupGarbageCollectedBlocks prometheus.Counter,
367377
blocksMarkedForDeletion prometheus.Counter,
378+
blockMakredForNoCopmact prometheus.Counter,
368379
hashFunc metadata.HashFunc,
380+
skipChunksWithOutOfOrderChunks bool,
369381
) (*Group, error) {
370382
if logger == nil {
371383
logger = log.NewNopLogger()
372384
}
373385
g := &Group{
374-
logger: logger,
375-
bkt: bkt,
376-
key: key,
377-
labels: lset,
378-
resolution: resolution,
379-
acceptMalformedIndex: acceptMalformedIndex,
380-
enableVerticalCompaction: enableVerticalCompaction,
381-
compactions: compactions,
382-
compactionRunsStarted: compactionRunsStarted,
383-
compactionRunsCompleted: compactionRunsCompleted,
384-
compactionFailures: compactionFailures,
385-
verticalCompactions: verticalCompactions,
386-
groupGarbageCollectedBlocks: groupGarbageCollectedBlocks,
387-
blocksMarkedForDeletion: blocksMarkedForDeletion,
388-
hashFunc: hashFunc,
386+
logger: logger,
387+
bkt: bkt,
388+
key: key,
389+
labels: lset,
390+
resolution: resolution,
391+
acceptMalformedIndex: acceptMalformedIndex,
392+
enableVerticalCompaction: enableVerticalCompaction,
393+
compactions: compactions,
394+
compactionRunsStarted: compactionRunsStarted,
395+
compactionRunsCompleted: compactionRunsCompleted,
396+
compactionFailures: compactionFailures,
397+
verticalCompactions: verticalCompactions,
398+
groupGarbageCollectedBlocks: groupGarbageCollectedBlocks,
399+
blocksMarkedForDeletion: blocksMarkedForDeletion,
400+
blocksMarkedForNoCompact: blockMakredForNoCopmact,
401+
hashFunc: hashFunc,
402+
skipChunksWithOutofOrderBlocks: skipChunksWithOutOfOrderChunks,
389403
}
390404
return g, nil
391405
}
@@ -541,6 +555,26 @@ func IsIssue347Error(err error) bool {
541555
return ok
542556
}
543557

558+
// OutOfOrderChunkError is a type wrapper for OOO chunk error from validating block index.
559+
type OutOfOrderChunksError struct {
560+
err error
561+
id ulid.ULID
562+
}
563+
564+
func (e OutOfOrderChunksError) Error() string {
565+
return e.err.Error()
566+
}
567+
568+
func outOfOrderChunkError(err error, brokenBlock ulid.ULID) OutOfOrderChunksError {
569+
return OutOfOrderChunksError{err: err, id: brokenBlock}
570+
}
571+
572+
// IsOutOfOrderChunk returns true if the base error is a OutOfOrderChunkError.
573+
func IsOutOfOrderChunkError(err error) bool {
574+
_, ok := errors.Cause(err).(OutOfOrderChunksError)
575+
return ok
576+
}
577+
544578
// HaltError is a type wrapper for errors that should halt any further progress on compactions.
545579
type HaltError struct {
546580
err error
@@ -749,6 +783,10 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp
749783
return false, ulid.ULID{}, halt(errors.Wrapf(err, "block with not healthy index found %s; Compaction level %v; Labels: %v", bdir, meta.Compaction.Level, meta.Thanos.Labels))
750784
}
751785

786+
if err := stats.OutOfOrderChunksErr(); cg.skipChunksWithOutofOrderBlocks && err != nil {
787+
return false, ulid.ULID{}, outOfOrderChunkError(errors.Wrapf(err, "blocks with out-of-order chunks are dropped from compaction: %s", bdir), meta.ULID)
788+
}
789+
752790
if err := stats.Issue347OutsideChunksErr(); err != nil {
753791
return false, ulid.ULID{}, issue347Error(errors.Wrapf(err, "invalid, but reparable block %s", bdir), meta.ULID)
754792
}
@@ -939,6 +977,21 @@ func (c *BucketCompactor) Compact(ctx context.Context) (rerr error) {
939977
continue
940978
}
941979
}
980+
// if block has out of order chunk, mark the block for no compaction and continue.
981+
if IsOutOfOrderChunkError(err) {
982+
if err := block.MarkForNoCompact(
983+
ctx,
984+
c.logger,
985+
c.bkt,
986+
err.(OutOfOrderChunksError).id,
987+
metadata.OutOfOrderChunksNoCompactReason,
988+
"OutofOrderChunk: marking block with out-of-order series/chunks to as no compact to unblock compaction", g.blocksMarkedForNoCompact); err == nil {
989+
mtx.Lock()
990+
finishedAllGroups = false
991+
mtx.Unlock()
992+
continue
993+
}
994+
}
942995
errChan <- errors.Wrapf(err, "group %s", g.Key())
943996
return
944997
}

0 commit comments

Comments
 (0)