@@ -229,18 +229,20 @@ func defaultGroupKey(res int64, lbls labels.Labels) string {
229229// DefaultGrouper is the Thanos built-in grouper. It groups blocks based on downsample
230230// resolution and block's labels.
231231type DefaultGrouper struct {
232- bkt objstore.Bucket
233- logger log.Logger
234- acceptMalformedIndex bool
235- enableVerticalCompaction bool
236- compactions * prometheus.CounterVec
237- compactionRunsStarted * prometheus.CounterVec
238- compactionRunsCompleted * prometheus.CounterVec
239- compactionFailures * prometheus.CounterVec
240- verticalCompactions * prometheus.CounterVec
241- garbageCollectedBlocks prometheus.Counter
242- blocksMarkedForDeletion prometheus.Counter
243- hashFunc metadata.HashFunc
232+ bkt objstore.Bucket
233+ logger log.Logger
234+ acceptMalformedIndex bool
235+ enableVerticalCompaction bool
236+ compactions * prometheus.CounterVec
237+ compactionRunsStarted * prometheus.CounterVec
238+ compactionRunsCompleted * prometheus.CounterVec
239+ compactionFailures * prometheus.CounterVec
240+ verticalCompactions * prometheus.CounterVec
241+ garbageCollectedBlocks prometheus.Counter
242+ blocksMarkedForDeletion prometheus.Counter
243+ blocksMarkedForNoCompact prometheus.Counter
244+ hashFunc metadata.HashFunc
245+ skipChunksWithOutOfOrderBlocks bool
244246}
245247
246248// NewDefaultGrouper makes a new DefaultGrouper.
@@ -252,7 +254,9 @@ func NewDefaultGrouper(
252254 reg prometheus.Registerer ,
253255 blocksMarkedForDeletion prometheus.Counter ,
254256 garbageCollectedBlocks prometheus.Counter ,
257+ blocksMarkedForNoCompact prometheus.Counter ,
255258 hashFunc metadata.HashFunc ,
259+ skipChunksWithOutOfOrderBlocks bool ,
256260) * DefaultGrouper {
257261 return & DefaultGrouper {
258262 bkt : bkt ,
@@ -279,9 +283,11 @@ func NewDefaultGrouper(
279283 Name : "thanos_compact_group_vertical_compactions_total" ,
280284 Help : "Total number of group compaction attempts that resulted in a new block based on overlapping blocks." ,
281285 }, []string {"group" }),
282- garbageCollectedBlocks : garbageCollectedBlocks ,
283- blocksMarkedForDeletion : blocksMarkedForDeletion ,
284- hashFunc : hashFunc ,
286+ blocksMarkedForNoCompact : blocksMarkedForNoCompact ,
287+ garbageCollectedBlocks : garbageCollectedBlocks ,
288+ blocksMarkedForDeletion : blocksMarkedForDeletion ,
289+ hashFunc : hashFunc ,
290+ skipChunksWithOutOfOrderBlocks : skipChunksWithOutOfOrderBlocks ,
285291 }
286292}
287293
@@ -309,7 +315,9 @@ func (g *DefaultGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (res []*Gro
309315 g .verticalCompactions .WithLabelValues (groupKey ),
310316 g .garbageCollectedBlocks ,
311317 g .blocksMarkedForDeletion ,
318+ g .blocksMarkedForNoCompact ,
312319 g .hashFunc ,
320+ g .skipChunksWithOutOfOrderBlocks ,
313321 )
314322 if err != nil {
315323 return nil , errors .Wrap (err , "create compaction group" )
@@ -330,23 +338,25 @@ func (g *DefaultGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (res []*Gro
330338// Group captures a set of blocks that have the same origin labels and downsampling resolution.
331339// Those blocks generally contain the same series and can thus efficiently be compacted.
332340type Group struct {
333- logger log.Logger
334- bkt objstore.Bucket
335- key string
336- labels labels.Labels
337- resolution int64
338- mtx sync.Mutex
339- metasByMinTime []* metadata.Meta
340- acceptMalformedIndex bool
341- enableVerticalCompaction bool
342- compactions prometheus.Counter
343- compactionRunsStarted prometheus.Counter
344- compactionRunsCompleted prometheus.Counter
345- compactionFailures prometheus.Counter
346- verticalCompactions prometheus.Counter
347- groupGarbageCollectedBlocks prometheus.Counter
348- blocksMarkedForDeletion prometheus.Counter
349- hashFunc metadata.HashFunc
341+ logger log.Logger
342+ bkt objstore.Bucket
343+ key string
344+ labels labels.Labels
345+ resolution int64
346+ mtx sync.Mutex
347+ metasByMinTime []* metadata.Meta
348+ acceptMalformedIndex bool
349+ enableVerticalCompaction bool
350+ compactions prometheus.Counter
351+ compactionRunsStarted prometheus.Counter
352+ compactionRunsCompleted prometheus.Counter
353+ compactionFailures prometheus.Counter
354+ verticalCompactions prometheus.Counter
355+ groupGarbageCollectedBlocks prometheus.Counter
356+ blocksMarkedForDeletion prometheus.Counter
357+ blocksMarkedForNoCompact prometheus.Counter
358+ hashFunc metadata.HashFunc
359+ skipChunksWithOutofOrderBlocks bool
350360}
351361
352362// NewGroup returns a new compaction group.
@@ -365,27 +375,31 @@ func NewGroup(
365375 verticalCompactions prometheus.Counter ,
366376 groupGarbageCollectedBlocks prometheus.Counter ,
367377 blocksMarkedForDeletion prometheus.Counter ,
378+ blockMakredForNoCopmact prometheus.Counter ,
368379 hashFunc metadata.HashFunc ,
380+ skipChunksWithOutOfOrderChunks bool ,
369381) (* Group , error ) {
370382 if logger == nil {
371383 logger = log .NewNopLogger ()
372384 }
373385 g := & Group {
374- logger : logger ,
375- bkt : bkt ,
376- key : key ,
377- labels : lset ,
378- resolution : resolution ,
379- acceptMalformedIndex : acceptMalformedIndex ,
380- enableVerticalCompaction : enableVerticalCompaction ,
381- compactions : compactions ,
382- compactionRunsStarted : compactionRunsStarted ,
383- compactionRunsCompleted : compactionRunsCompleted ,
384- compactionFailures : compactionFailures ,
385- verticalCompactions : verticalCompactions ,
386- groupGarbageCollectedBlocks : groupGarbageCollectedBlocks ,
387- blocksMarkedForDeletion : blocksMarkedForDeletion ,
388- hashFunc : hashFunc ,
386+ logger : logger ,
387+ bkt : bkt ,
388+ key : key ,
389+ labels : lset ,
390+ resolution : resolution ,
391+ acceptMalformedIndex : acceptMalformedIndex ,
392+ enableVerticalCompaction : enableVerticalCompaction ,
393+ compactions : compactions ,
394+ compactionRunsStarted : compactionRunsStarted ,
395+ compactionRunsCompleted : compactionRunsCompleted ,
396+ compactionFailures : compactionFailures ,
397+ verticalCompactions : verticalCompactions ,
398+ groupGarbageCollectedBlocks : groupGarbageCollectedBlocks ,
399+ blocksMarkedForDeletion : blocksMarkedForDeletion ,
400+ blocksMarkedForNoCompact : blockMakredForNoCopmact ,
401+ hashFunc : hashFunc ,
402+ skipChunksWithOutofOrderBlocks : skipChunksWithOutOfOrderChunks ,
389403 }
390404 return g , nil
391405}
@@ -541,6 +555,26 @@ func IsIssue347Error(err error) bool {
541555 return ok
542556}
543557
558+ // OutOfOrderChunkError is a type wrapper for OOO chunk error from validating block index.
559+ type OutOfOrderChunksError struct {
560+ err error
561+ id ulid.ULID
562+ }
563+
564+ func (e OutOfOrderChunksError ) Error () string {
565+ return e .err .Error ()
566+ }
567+
568+ func outOfOrderChunkError (err error , brokenBlock ulid.ULID ) OutOfOrderChunksError {
569+ return OutOfOrderChunksError {err : err , id : brokenBlock }
570+ }
571+
572+ // IsOutOfOrderChunk returns true if the base error is a OutOfOrderChunkError.
573+ func IsOutOfOrderChunkError (err error ) bool {
574+ _ , ok := errors .Cause (err ).(OutOfOrderChunksError )
575+ return ok
576+ }
577+
544578// HaltError is a type wrapper for errors that should halt any further progress on compactions.
545579type HaltError struct {
546580 err error
@@ -749,6 +783,10 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp
749783 return false , ulid.ULID {}, halt (errors .Wrapf (err , "block with not healthy index found %s; Compaction level %v; Labels: %v" , bdir , meta .Compaction .Level , meta .Thanos .Labels ))
750784 }
751785
786+ if err := stats .OutOfOrderChunksErr (); cg .skipChunksWithOutofOrderBlocks && err != nil {
787+ return false , ulid.ULID {}, outOfOrderChunkError (errors .Wrapf (err , "blocks with out-of-order chunks are dropped from compaction: %s" , bdir ), meta .ULID )
788+ }
789+
752790 if err := stats .Issue347OutsideChunksErr (); err != nil {
753791 return false , ulid.ULID {}, issue347Error (errors .Wrapf (err , "invalid, but reparable block %s" , bdir ), meta .ULID )
754792 }
@@ -939,6 +977,21 @@ func (c *BucketCompactor) Compact(ctx context.Context) (rerr error) {
939977 continue
940978 }
941979 }
980+ // if block has out of order chunk, mark the block for no compaction and continue.
981+ if IsOutOfOrderChunkError (err ) {
982+ if err := block .MarkForNoCompact (
983+ ctx ,
984+ c .logger ,
985+ c .bkt ,
986+ err .(OutOfOrderChunksError ).id ,
987+ metadata .OutOfOrderChunksNoCompactReason ,
988+ "OutofOrderChunk: marking block with out-of-order series/chunks to as no compact to unblock compaction" , g .blocksMarkedForNoCompact ); err == nil {
989+ mtx .Lock ()
990+ finishedAllGroups = false
991+ mtx .Unlock ()
992+ continue
993+ }
994+ }
942995 errChan <- errors .Wrapf (err , "group %s" , g .Key ())
943996 return
944997 }
0 commit comments