@@ -306,7 +306,7 @@ func (s *levelsController) dropPrefixes(prefixes [][]byte) error {
306
306
// function in logs, and forces a compaction.
307
307
dropPrefixes : prefixes ,
308
308
}
309
- if err := s .doCompact (cp ); err != nil {
309
+ if err := s .doCompact (174 , cp ); err != nil {
310
310
opt .Warningf ("While compacting level 0: %v" , err )
311
311
return nil
312
312
}
@@ -366,11 +366,13 @@ func (s *levelsController) startCompact(lc *y.Closer) {
366
366
n := s .kv .opt .NumCompactors
367
367
lc .AddRunning (n - 1 )
368
368
for i := 0 ; i < n ; i ++ {
369
- go s .runWorker (lc )
369
+ // The worker with id=0 is dedicated to L0 and L1. This is not counted
370
+ // towards the user specified NumCompactors.
371
+ go s .runCompactor (i , lc )
370
372
}
371
373
}
372
374
373
- func (s * levelsController ) runWorker ( lc * y.Closer ) {
375
+ func (s * levelsController ) runCompactor ( id int , lc * y.Closer ) {
374
376
defer lc .Done ()
375
377
376
378
randomDelay := time .NewTimer (time .Duration (rand .Int31n (1000 )) * time .Millisecond )
@@ -381,7 +383,7 @@ func (s *levelsController) runWorker(lc *y.Closer) {
381
383
return
382
384
}
383
385
384
- ticker := time .NewTicker (time .Second )
386
+ ticker := time .NewTicker (100 * time .Millisecond )
385
387
defer ticker .Stop ()
386
388
387
389
for {
@@ -391,7 +393,15 @@ func (s *levelsController) runWorker(lc *y.Closer) {
391
393
prios := s .pickCompactLevels ()
392
394
loop:
393
395
for _ , p := range prios {
394
- err := s .doCompact (p )
396
+ if id == 0 && p .level > 1 {
397
+ // If I'm ID zero, I only compact L0 and L1.
398
+ continue
399
+ }
400
+ if id != 0 && p .level <= 1 {
401
+ // If I'm ID non-zero, I do NOT compact L0 and L1.
402
+ continue
403
+ }
404
+ err := s .doCompact (id , p )
395
405
switch err {
396
406
case nil :
397
407
break loop
@@ -453,10 +463,11 @@ func (s *levelsController) pickCompactLevels() (prios []compactionPriority) {
453
463
prios = append (prios , pri )
454
464
}
455
465
}
456
- // We used to sort compaction priorities based on the score. But, we
457
- // decided to compact based on the level, not the priority. So, upper
458
- // levels (level 0, level 1, etc) always get compacted first, before the
459
- // lower levels -- this allows us to avoid stalls.
466
+ // We should continue to sort the compaction priorities by score. Now that we have a dedicated
467
+ // compactor for L0 and L1, we don't need to sort by level here.
468
+ sort .Slice (prios , func (i , j int ) bool {
469
+ return prios [i ].score > prios [j ].score
470
+ })
460
471
return prios
461
472
}
462
473
@@ -541,15 +552,13 @@ nextTable:
541
552
// that would affect the snapshot view guarantee provided by transactions.
542
553
discardTs := s .kv .orc .discardAtOrBelow ()
543
554
544
- // Start generating new tables.
545
- type newTableResult struct {
546
- table * table.Table
547
- err error
548
- }
549
- resultCh := make (chan newTableResult )
550
555
var numBuilds , numVersions int
551
556
var lastKey , skipKey []byte
552
557
var vp valuePointer
558
+ var newTables []* table.Table
559
+ mu := new (sync.Mutex ) // Guards newTables
560
+
561
+ inflightBuilders := y .NewThrottle (5 )
553
562
for it .Valid () {
554
563
timeStart := time .Now ()
555
564
dk , err := s .kv .registry .latestDataKey ()
@@ -646,67 +655,66 @@ nextTable:
646
655
// called Add() at least once, and builder is not Empty().
647
656
s .kv .opt .Debugf ("LOG Compact. Added %d keys. Skipped %d keys. Iteration took: %v" ,
648
657
numKeys , numSkips , time .Since (timeStart ))
649
- build := func (fileID uint64 ) (* table.Table , error ) {
650
- fd , err := y .CreateSyncedFile (table .NewFilename (fileID , s .kv .opt .Dir ), true )
651
- if err != nil {
652
- return nil , errors .Wrapf (err , "While opening new table: %d" , fileID )
653
- }
654
-
655
- if _ , err := fd .Write (builder .Finish ()); err != nil {
656
- return nil , errors .Wrapf (err , "Unable to write to file: %d" , fileID )
657
- }
658
- tbl , err := table .OpenTable (fd , bopts )
659
- // decrRef is added below.
660
- return tbl , errors .Wrapf (err , "Unable to open table: %q" , fd .Name ())
661
- }
662
658
if builder .Empty () {
663
659
continue
664
660
}
665
661
numBuilds ++
666
662
fileID := s .reserveFileID ()
663
+ if err := inflightBuilders .Do (); err != nil {
664
+ // Can't return from here, until I decrRef all the tables that I built so far.
665
+ break
666
+ }
667
667
go func (builder * table.Builder ) {
668
668
defer builder .Close ()
669
- var (
670
- tbl * table.Table
671
- err error
672
- )
669
+ defer inflightBuilders .Done (err )
670
+
671
+ build := func (fileID uint64 ) (* table.Table , error ) {
672
+ fd , err := y .CreateSyncedFile (table .NewFilename (fileID , s .kv .opt .Dir ), true )
673
+ if err != nil {
674
+ return nil , errors .Wrapf (err , "While opening new table: %d" , fileID )
675
+ }
676
+
677
+ if _ , err := fd .Write (builder .Finish ()); err != nil {
678
+ return nil , errors .Wrapf (err , "Unable to write to file: %d" , fileID )
679
+ }
680
+ tbl , err := table .OpenTable (fd , bopts )
681
+ // decrRef is added below.
682
+ return tbl , errors .Wrapf (err , "Unable to open table: %q" , fd .Name ())
683
+ }
684
+
685
+ var tbl * table.Table
686
+ var err error
673
687
if s .kv .opt .InMemory {
674
688
tbl , err = table .OpenInMemoryTable (builder .Finish (), fileID , & bopts )
675
689
} else {
676
690
tbl , err = build (fileID )
677
691
}
678
- resultCh <- newTableResult {tbl , err }
679
- }(builder )
680
- }
681
692
682
- newTables := make ([] * table. Table , 0 , 20 )
683
- // Wait for all table builders to finish.
684
- var firstErr error
685
- for x := 0 ; x < numBuilds ; x ++ {
686
- res := <- resultCh
687
- newTables = append ( newTables , res . table )
688
- if firstErr == nil {
689
- firstErr = res . err
690
- }
693
+ // If we couldn't build the table, return fast.
694
+ if err != nil {
695
+ return
696
+ }
697
+
698
+ mu . Lock ( )
699
+ newTables = append ( newTables , tbl )
700
+ mu . Unlock ()
701
+ }( builder )
691
702
}
692
703
693
- if firstErr == nil {
704
+ // Wait for all table builders to finish and also for newTables accumulator to finish.
705
+ err := inflightBuilders .Finish ()
706
+ if err == nil {
694
707
// Ensure created files' directory entries are visible. We don't mind the extra latency
695
708
// from not doing this ASAP after all file creation has finished because this is a
696
709
// background operation.
697
- firstErr = s .kv .syncDir (s .kv .opt .Dir )
710
+ err = s .kv .syncDir (s .kv .opt .Dir )
698
711
}
699
712
700
- if firstErr != nil {
713
+ if err != nil {
701
714
// An error happened. Delete all the newly created table files (by calling DecrRef
702
715
// -- we're the only holders of a ref).
703
- for j := 0 ; j < numBuilds ; j ++ {
704
- if newTables [j ] != nil {
705
- _ = newTables [j ].DecrRef ()
706
- }
707
- }
708
- errorReturn := errors .Wrapf (firstErr , "While running compaction for: %+v" , cd )
709
- return nil , nil , errorReturn
716
+ _ = decrRefs (newTables )
717
+ return nil , nil , errors .Wrapf (err , "while running compactions for: %+v" , cd )
710
718
}
711
719
712
720
sort .Slice (newTables , func (i , j int ) bool {
@@ -960,7 +968,7 @@ func (s *levelsController) runCompactDef(l int, cd compactDef) (err error) {
960
968
var errFillTables = errors .New ("Unable to fill tables" )
961
969
962
970
// doCompact picks some table on level l and compacts it away to the next level.
963
- func (s * levelsController ) doCompact (p compactionPriority ) error {
971
+ func (s * levelsController ) doCompact (id int , p compactionPriority ) error {
964
972
l := p .level
965
973
y .AssertTrue (l + 1 < s .kv .opt .MaxLevels ) // Sanity check.
966
974
@@ -973,7 +981,7 @@ func (s *levelsController) doCompact(p compactionPriority) error {
973
981
cd .elog .SetMaxEvents (100 )
974
982
defer cd .elog .Finish ()
975
983
976
- s .kv .opt .Infof ( "Got compaction priority : %+v" , p )
984
+ s .kv .opt .Debugf ( "[Compactor: %d] Attempting to run compaction : %+v", id , p )
977
985
978
986
// While picking tables to be compacted, both levels' tables are expected to
979
987
// remain unchanged.
@@ -989,16 +997,17 @@ func (s *levelsController) doCompact(p compactionPriority) error {
989
997
}
990
998
defer s .cstatus .delete (cd ) // Remove the ranges from compaction status.
991
999
992
- s .kv .opt .Infof ("Running for level: %d\n " , cd .thisLevel .level )
1000
+ s .kv .opt .Infof ("[Compactor: %d] Running compaction: %+v for level: %d\n " ,
1001
+ id , p , cd .thisLevel .level )
993
1002
s .cstatus .toLog (cd .elog )
994
1003
if err := s .runCompactDef (l , cd ); err != nil {
995
1004
// This compaction couldn't be done successfully.
996
- s .kv .opt .Warningf ("LOG Compact FAILED with error: %+v: %+v" , err , cd )
1005
+ s .kv .opt .Warningf ("[Compactor: %d] LOG Compact FAILED with error: %+v: %+v" , id , err , cd )
997
1006
return err
998
1007
}
999
1008
1000
1009
s .cstatus .toLog (cd .elog )
1001
- s .kv .opt .Infof ("Compaction for level: %d DONE" , cd .thisLevel .level )
1010
+ s .kv .opt .Infof ("[Compactor: %d] Compaction for level: %d DONE" , id , cd .thisLevel .level )
1002
1011
return nil
1003
1012
}
1004
1013
@@ -1022,7 +1031,7 @@ func (s *levelsController) addLevel0Table(t *table.Table) error {
1022
1031
// Stall. Make sure all levels are healthy before we unstall.
1023
1032
var timeStart time.Time
1024
1033
{
1025
- s .kv .opt .Debugf ("STALLED STALLED STALLED: %v\n " , time .Since (s .lastUnstalled ))
1034
+ s .kv .opt .Infof ("STALLED STALLED STALLED: %v\n " , time .Since (s .lastUnstalled ))
1026
1035
s .cstatus .RLock ()
1027
1036
for i := 0 ; i < s .kv .opt .MaxLevels ; i ++ {
1028
1037
s .kv .opt .Debugf ("level=%d. Status=%s Size=%d\n " ,
0 commit comments