@@ -109,7 +109,7 @@ type DB struct {
109
109
lc * levelsController
110
110
vlog valueLog
111
111
writeCh chan * request
112
- flushChan chan flushTask // For flushing memtables.
112
+ flushChan chan * memTable // For flushing memtables.
113
113
closeOnce sync.Once // For closing DB only once.
114
114
115
115
blockWrites int32
@@ -240,7 +240,7 @@ func Open(opt Options) (*DB, error) {
240
240
241
241
db := & DB {
242
242
imm : make ([]* memTable , 0 , opt .NumMemtables ),
243
- flushChan : make (chan flushTask , opt .NumMemtables ),
243
+ flushChan : make (chan * memTable , opt .NumMemtables ),
244
244
writeCh : make (chan * request , kvWriteChCapacity ),
245
245
opt : opt ,
246
246
manifest : manifestFile ,
@@ -355,7 +355,7 @@ func Open(opt Options) (*DB, error) {
355
355
}()
356
356
// Flush them to disk asap.
357
357
for _ , mt := range db .imm {
358
- db .flushChan <- flushTask { mt : mt }
358
+ db .flushChan <- mt
359
359
}
360
360
}
361
361
// We do increment nextTxnTs below. So, no need to do it here.
@@ -568,12 +568,12 @@ func (db *DB) close() (err error) {
568
568
} else {
569
569
db .opt .Debugf ("Flushing memtable" )
570
570
for {
571
- pushedFlushTask := func () bool {
571
+ pushedMemTable := func () bool {
572
572
db .lock .Lock ()
573
573
defer db .lock .Unlock ()
574
574
y .AssertTrue (db .mt != nil )
575
575
select {
576
- case db .flushChan <- flushTask { mt : db .mt } :
576
+ case db .flushChan <- db .mt :
577
577
db .imm = append (db .imm , db .mt ) // Flusher will attempt to remove this from s.imm.
578
578
db .mt = nil // Will segfault if we try writing!
579
579
db .opt .Debugf ("pushed to flush chan\n " )
@@ -586,7 +586,7 @@ func (db *DB) close() (err error) {
586
586
}
587
587
return false
588
588
}()
589
- if pushedFlushTask {
589
+ if pushedMemTable {
590
590
break
591
591
}
592
592
time .Sleep (10 * time .Millisecond )
@@ -826,6 +826,7 @@ func (db *DB) writeRequests(reqs []*request) error {
826
826
}
827
827
count += len (b .Entries )
828
828
var i uint64
829
+ var err error
829
830
for err = db .ensureRoomForWrite (); err == errNoRoom ; err = db .ensureRoomForWrite () {
830
831
i ++
831
832
if i % 100 == 0 {
@@ -987,7 +988,7 @@ func (db *DB) ensureRoomForWrite() error {
987
988
}
988
989
989
990
select {
990
- case db .flushChan <- flushTask { mt : db .mt } :
991
+ case db .flushChan <- db .mt :
991
992
db .opt .Debugf ("Flushing memtable, mt.size=%d size of flushChan: %d\n " ,
992
993
db .mt .sl .MemSize (), len (db .flushChan ))
993
994
// We manage to push this task. Let's modify imm.
@@ -1009,12 +1010,12 @@ func arenaSize(opt Options) int64 {
1009
1010
}
1010
1011
1011
1012
// buildL0Table builds a new table from the memtable.
1012
- func buildL0Table (ft flushTask , bopts table.Options ) * table.Builder {
1013
- iter := ft .mt .sl .NewIterator ()
1013
+ func buildL0Table (iter y.Iterator , dropPrefixes [][]byte , bopts table.Options ) * table.Builder {
1014
1014
defer iter .Close ()
1015
+
1015
1016
b := table .NewTableBuilder (bopts )
1016
- for iter .SeekToFirst (); iter .Valid (); iter .Next () {
1017
- if len (ft . dropPrefixes ) > 0 && hasAnyPrefixes (iter .Key (), ft . dropPrefixes ) {
1017
+ for iter .Rewind (); iter .Valid (); iter .Next () {
1018
+ if len (dropPrefixes ) > 0 && hasAnyPrefixes (iter .Key (), dropPrefixes ) {
1018
1019
continue
1019
1020
}
1020
1021
vs := iter .Value ()
@@ -1024,23 +1025,14 @@ func buildL0Table(ft flushTask, bopts table.Options) *table.Builder {
1024
1025
}
1025
1026
b .Add (iter .Key (), iter .Value (), vp .Len )
1026
1027
}
1027
- return b
1028
- }
1029
1028
1030
- type flushTask struct {
1031
- mt * memTable
1032
- dropPrefixes [][]byte
1029
+ return b
1033
1030
}
1034
1031
1035
- // handleFlushTask must be run serially.
1036
- func (db * DB ) handleFlushTask (ft flushTask ) error {
1037
- // There can be a scenario, when empty memtable is flushed.
1038
- if ft .mt .sl .Empty () {
1039
- return nil
1040
- }
1041
-
1032
+ // handleMemTableFlush must be run serially.
1033
+ func (db * DB ) handleMemTableFlush (itr y.Iterator , dropPrefixes [][]byte ) error {
1042
1034
bopts := buildTableOptions (db )
1043
- builder := buildL0Table (ft , bopts )
1035
+ builder := buildL0Table (itr , nil , bopts )
1044
1036
defer builder .Close ()
1045
1037
1046
1038
// buildL0Table can return nil if the none of the items in the skiplist are
@@ -1069,39 +1061,62 @@ func (db *DB) handleFlushTask(ft flushTask) error {
1069
1061
return err
1070
1062
}
1071
1063
1072
- // flushMemtable must keep running until we send it an empty flushTask . If there
1073
- // are errors during handling the flush task , we'll retry indefinitely.
1064
+ // flushMemtable must keep running until we send it an empty memtable . If there
1065
+ // are errors during handling the memtable flush , we'll retry indefinitely.
1074
1066
func (db * DB ) flushMemtable (lc * z.Closer ) error {
1075
1067
defer lc .Done ()
1076
1068
1077
- for ft := range db .flushChan {
1078
- if ft .mt == nil {
1079
- // We close db.flushChan now, instead of sending a nil ft.mt.
1080
- continue
1081
- }
1082
- for {
1083
- err := db .handleFlushTask (ft )
1084
- if err == nil {
1069
+ var sz int64
1070
+ var itrs []y.Iterator
1071
+ var mts []* memTable
1072
+ for { //nolint:gosimple
1073
+ select {
1074
+ case mt , ok := <- db .flushChan :
1075
+ if mt != nil {
1076
+ itrs = append (itrs , mt .sl .NewUniIterator (false ))
1077
+ mts = append (mts , mt )
1078
+ sz += mt .sl .MemSize ()
1079
+ if sz < db .opt .MemTableSize {
1080
+ continue
1081
+ }
1082
+ }
1083
+
1084
+ if ! ok && len (mts ) == 0 {
1085
+ return nil
1086
+ }
1087
+ if len (mts ) == 0 {
1088
+ continue
1089
+ }
1090
+
1091
+ mitr := table .NewMergeIterator (itrs , false )
1092
+ for {
1093
+ if err := db .handleMemTableFlush (mitr , nil ); err != nil {
1094
+ // Encountered error. Retry indefinitely.
1095
+ db .opt .Errorf ("error flushing memtable to disk: %v, retrying" , err )
1096
+ time .Sleep (time .Second )
1097
+ continue
1098
+ }
1099
+
1085
1100
// Update s.imm. Need a lock.
1086
1101
db .lock .Lock ()
1087
- // This is a single-threaded operation. ft. mt corresponds to the head of
1088
- // db.imm list. Once we flush it, we advance db.imm. The next ft. mt
1102
+ // This is a single-threaded operation. mt corresponds to the head of
1103
+ // db.imm list. Once we flush it, we advance db.imm. The next mt
1089
1104
// which would arrive here would match db.imm[0], because we acquire a
1090
1105
// lock over DB when pushing to flushChan.
1091
1106
// TODO: This logic is dirty AF. Any change and this could easily break.
1092
- y .AssertTrue (ft .mt == db .imm [0 ])
1093
- db .imm = db .imm [1 :]
1094
- ft .mt .DecrRef () // Return memory.
1107
+ for _ , mt := range mts {
1108
+ y .AssertTrue (mt == db .imm [0 ])
1109
+ db .imm = db .imm [1 :]
1110
+ mt .DecrRef () // Return memory.
1111
+ }
1095
1112
db .lock .Unlock ()
1096
-
1097
1113
break
1098
1114
}
1099
- // Encountered error. Retry indefinitely.
1100
- db . opt . Errorf ( "Failure while flushing memtable to disk: %v. Retrying... \n " , err )
1101
- time . Sleep ( time . Second )
1115
+
1116
+ // Reset everything.
1117
+ itrs , mts , sz = itrs [: 0 ], mts [: 0 ], 0
1102
1118
}
1103
1119
}
1104
- return nil
1105
1120
}
1106
1121
1107
1122
func exists (path string ) (bool , error ) {
@@ -1521,7 +1536,7 @@ func (db *DB) startCompactions() {
1521
1536
func (db * DB ) startMemoryFlush () {
1522
1537
// Start memory fluhser.
1523
1538
if db .closers .memtable != nil {
1524
- db .flushChan = make (chan flushTask , db .opt .NumMemtables )
1539
+ db .flushChan = make (chan * memTable , db .opt .NumMemtables )
1525
1540
db .closers .memtable = z .NewCloser (1 )
1526
1541
go func () {
1527
1542
_ = db .flushMemtable (db .closers .memtable )
@@ -1627,7 +1642,7 @@ func (db *DB) prepareToDrop() (func(), error) {
1627
1642
panic ("Attempting to drop data in read-only mode." )
1628
1643
}
1629
1644
// In order prepare for drop, we need to block the incoming writes and
1630
- // write it to db. Then, flush all the pending flushtask . So that, we
1645
+ // write it to db. Then, flush all the pending memtable . So that, we
1631
1646
// don't miss any entries.
1632
1647
if err := db .blockWrite (); err != nil {
1633
1648
return nil , err
@@ -1676,7 +1691,7 @@ func (db *DB) dropAll() (func(), error) {
1676
1691
if err != nil {
1677
1692
return f , err
1678
1693
}
1679
- // prepareToDrop will stop all the incomming write and flushes any pending flush tasks .
1694
+ // prepareToDrop will stop all the incomming write and flushes any pending memtables .
1680
1695
// Before we drop, we'll stop the compaction because anyways all the datas are going to
1681
1696
// be deleted.
1682
1697
db .stopCompactions ()
@@ -1758,13 +1773,9 @@ func (db *DB) DropPrefix(prefixes ...[]byte) error {
1758
1773
memtable .DecrRef ()
1759
1774
continue
1760
1775
}
1761
- task := flushTask {
1762
- mt : memtable ,
1763
- // Ensure that the head of value log gets persisted to disk.
1764
- dropPrefixes : filtered ,
1765
- }
1776
+ itr := memtable .sl .NewUniIterator (false )
1766
1777
db .opt .Debugf ("Flushing memtable" )
1767
- if err := db .handleFlushTask ( task ); err != nil {
1778
+ if err := db .handleMemTableFlush ( itr , filtered ); err != nil {
1768
1779
db .opt .Errorf ("While trying to flush memtable: %v" , err )
1769
1780
return err
1770
1781
}
0 commit comments