@@ -755,13 +755,6 @@ var requestPool = sync.Pool{
755
755
}
756
756
757
757
func (db * DB ) writeToLSM (b * request ) error {
758
- // We should check the length of b.Prts and b.Entries only when badger is not
759
- // running in InMemory mode. In InMemory mode, we don't write anything to the
760
- // value log and that's why the length of b.Ptrs will always be zero.
761
- if ! db .opt .InMemory && len (b .Ptrs ) != len (b .Entries ) {
762
- return errors .Errorf ("Ptrs and Entries don't match: %+v" , b )
763
- }
764
-
765
758
for i , entry := range b .Entries {
766
759
var err error
767
760
if entry .skipVlogAndSetThreshold (db .valueThreshold ()) {
@@ -826,6 +819,7 @@ func (db *DB) writeRequests(reqs []*request) error {
826
819
}
827
820
count += len (b .Entries )
828
821
var i uint64
822
+ var err error
829
823
for err = db .ensureRoomForWrite (); err == errNoRoom ; err = db .ensureRoomForWrite () {
830
824
i ++
831
825
if i % 100 == 0 {
@@ -1010,10 +1004,16 @@ func arenaSize(opt Options) int64 {
1010
1004
1011
1005
// buildL0Table builds a new table from the memtable.
1012
1006
func buildL0Table (ft flushTask , bopts table.Options ) * table.Builder {
1013
- iter := ft .mt .sl .NewIterator ()
1007
+ var iter y.Iterator
1008
+ if ft .itr != nil {
1009
+ iter = ft .itr
1010
+ } else {
1011
+ iter = ft .mt .sl .NewUniIterator (false )
1012
+ }
1014
1013
defer iter .Close ()
1014
+
1015
1015
b := table .NewTableBuilder (bopts )
1016
- for iter .SeekToFirst (); iter .Valid (); iter .Next () {
1016
+ for iter .Rewind (); iter .Valid (); iter .Next () {
1017
1017
if len (ft .dropPrefixes ) > 0 && hasAnyPrefixes (iter .Key (), ft .dropPrefixes ) {
1018
1018
continue
1019
1019
}
@@ -1029,16 +1029,13 @@ func buildL0Table(ft flushTask, bopts table.Options) *table.Builder {
1029
1029
1030
1030
type flushTask struct {
1031
1031
mt * memTable
1032
+ itr y.Iterator
1032
1033
dropPrefixes [][]byte
1033
1034
}
1034
1035
1035
1036
// handleFlushTask must be run serially.
1036
1037
func (db * DB ) handleFlushTask (ft flushTask ) error {
1037
- // There can be a scenario, when empty memtable is flushed.
1038
- if ft .mt .sl .Empty () {
1039
- return nil
1040
- }
1041
-
1038
+ // ft.mt could be nil with ft.itr being the valid field.
1042
1039
bopts := buildTableOptions (db )
1043
1040
builder := buildL0Table (ft , bopts )
1044
1041
defer builder .Close ()
@@ -1074,11 +1071,51 @@ func (db *DB) handleFlushTask(ft flushTask) error {
1074
1071
func (db * DB ) flushMemtable (lc * z.Closer ) error {
1075
1072
defer lc .Done ()
1076
1073
1074
+ var sz int64
1075
+ var itrs []y.Iterator
1076
+ var mts []* memTable
1077
+ slurp := func () {
1078
+ for {
1079
+ select {
1080
+ case more , ok := <- db .flushChan :
1081
+ if ! ok {
1082
+ return
1083
+ }
1084
+ if more .mt == nil {
1085
+ continue
1086
+ }
1087
+ sl := more .mt .sl
1088
+ itrs = append (itrs , sl .NewUniIterator (false ))
1089
+ mts = append (mts , more .mt )
1090
+
1091
+ sz += sl .MemSize ()
1092
+ if sz > db .opt .MemTableSize {
1093
+ return
1094
+ }
1095
+ default :
1096
+ return
1097
+ }
1098
+ }
1099
+ }
1100
+
1077
1101
for ft := range db .flushChan {
1078
1102
if ft .mt == nil {
1079
1103
// We close db.flushChan now, instead of sending a nil ft.mt.
1080
1104
continue
1081
1105
}
1106
+ sz = ft .mt .sl .MemSize ()
1107
+ // Reset of itrs, mts etc. is being done below.
1108
+ y .AssertTrue (len (itrs ) == 0 && len (mts ) == 0 )
1109
+ itrs = append (itrs , ft .mt .sl .NewUniIterator (false ))
1110
+ mts = append (mts , ft .mt )
1111
+
1112
+ // Pick more memtables, so we can really fill up the L0 table.
1113
+ slurp ()
1114
+
1115
+ // db.opt.Infof("Picked %d memtables. Size: %d\n", len(itrs), sz)
1116
+ ft .mt = nil
1117
+ ft .itr = table .NewMergeIterator (itrs , false )
1118
+
1082
1119
for {
1083
1120
err := db .handleFlushTask (ft )
1084
1121
if err == nil {
@@ -1089,9 +1126,11 @@ func (db *DB) flushMemtable(lc *z.Closer) error {
1089
1126
// which would arrive here would match db.imm[0], because we acquire a
1090
1127
// lock over DB when pushing to flushChan.
1091
1128
// TODO: This logic is dirty AF. Any change and this could easily break.
1092
- y .AssertTrue (ft .mt == db .imm [0 ])
1093
- db .imm = db .imm [1 :]
1094
- ft .mt .DecrRef () // Return memory.
1129
+ for _ , mt := range mts {
1130
+ y .AssertTrue (mt == db .imm [0 ])
1131
+ db .imm = db .imm [1 :]
1132
+ mt .DecrRef () // Return memory.
1133
+ }
1095
1134
db .lock .Unlock ()
1096
1135
1097
1136
break
@@ -1100,6 +1139,8 @@ func (db *DB) flushMemtable(lc *z.Closer) error {
1100
1139
db .opt .Errorf ("Failure while flushing memtable to disk: %v. Retrying...\n " , err )
1101
1140
time .Sleep (time .Second )
1102
1141
}
1142
+ // Reset everything.
1143
+ itrs , mts , sz = itrs [:0 ], mts [:0 ], 0
1103
1144
}
1104
1145
return nil
1105
1146
}
0 commit comments