@@ -755,13 +755,6 @@ var requestPool = sync.Pool{
755
755
}
756
756
757
757
func (db * DB ) writeToLSM (b * request ) error {
758
- // We should check the length of b.Prts and b.Entries only when badger is not
759
- // running in InMemory mode. In InMemory mode, we don't write anything to the
760
- // value log and that's why the length of b.Ptrs will always be zero.
761
- if ! db .opt .InMemory && len (b .Ptrs ) != len (b .Entries ) {
762
- return errors .Errorf ("Ptrs and Entries don't match: %+v" , b )
763
- }
764
-
765
758
for i , entry := range b .Entries {
766
759
var err error
767
760
if entry .skipVlogAndSetThreshold (db .valueThreshold ()) {
@@ -826,6 +819,7 @@ func (db *DB) writeRequests(reqs []*request) error {
826
819
}
827
820
count += len (b .Entries )
828
821
var i uint64
822
+ var err error
829
823
for err = db .ensureRoomForWrite (); err == errNoRoom ; err = db .ensureRoomForWrite () {
830
824
i ++
831
825
if i % 100 == 0 {
@@ -1010,10 +1004,16 @@ func arenaSize(opt Options) int64 {
1010
1004
1011
1005
// buildL0Table builds a new table from the memtable.
1012
1006
func buildL0Table (ft flushTask , bopts table.Options ) * table.Builder {
1013
- iter := ft .mt .sl .NewIterator ()
1007
+ var iter y.Iterator
1008
+ if ft .itr != nil {
1009
+ iter = ft .itr
1010
+ } else {
1011
+ iter = ft .mt .sl .NewUniIterator (false )
1012
+ }
1014
1013
defer iter .Close ()
1014
+
1015
1015
b := table .NewTableBuilder (bopts )
1016
- for iter .SeekToFirst (); iter .Valid (); iter .Next () {
1016
+ for iter .Rewind (); iter .Valid (); iter .Next () {
1017
1017
if len (ft .dropPrefixes ) > 0 && hasAnyPrefixes (iter .Key (), ft .dropPrefixes ) {
1018
1018
continue
1019
1019
}
@@ -1029,16 +1029,13 @@ func buildL0Table(ft flushTask, bopts table.Options) *table.Builder {
1029
1029
1030
1030
type flushTask struct {
1031
1031
mt * memTable
1032
+ itr y.Iterator
1032
1033
dropPrefixes [][]byte
1033
1034
}
1034
1035
1035
1036
// handleFlushTask must be run serially.
1036
1037
func (db * DB ) handleFlushTask (ft flushTask ) error {
1037
- // There can be a scenario, when empty memtable is flushed.
1038
- if ft .mt .sl .Empty () {
1039
- return nil
1040
- }
1041
-
1038
+ // ft.mt could be nil with ft.itr being the valid field.
1042
1039
bopts := buildTableOptions (db )
1043
1040
builder := buildL0Table (ft , bopts )
1044
1041
defer builder .Close ()
@@ -1074,11 +1071,48 @@ func (db *DB) handleFlushTask(ft flushTask) error {
1074
1071
func (db * DB ) flushMemtable (lc * z.Closer ) error {
1075
1072
defer lc .Done ()
1076
1073
1074
+ var sz int64
1075
+ var itrs []y.Iterator
1076
+ var mts []* memTable
1077
+ slurp := func () {
1078
+ for {
1079
+ select {
1080
+ case more := <- db .flushChan :
1081
+ if more .mt == nil {
1082
+ return
1083
+ }
1084
+ sl := more .mt .sl
1085
+ itrs = append (itrs , sl .NewUniIterator (false ))
1086
+ mts = append (mts , more .mt )
1087
+
1088
+ sz += sl .MemSize ()
1089
+ if sz > db .opt .MemTableSize {
1090
+ return
1091
+ }
1092
+ default :
1093
+ return
1094
+ }
1095
+ }
1096
+ }
1097
+
1077
1098
for ft := range db .flushChan {
1078
1099
if ft .mt == nil {
1079
1100
// We close db.flushChan now, instead of sending a nil ft.mt.
1080
1101
continue
1081
1102
}
1103
+ sz = ft .mt .sl .MemSize ()
1104
+ // Reset of itrs, mts etc. is being done below.
1105
+ y .AssertTrue (len (itrs ) == 0 && len (mts ) == 0 )
1106
+ itrs = append (itrs , ft .mt .sl .NewUniIterator (false ))
1107
+ mts = append (mts , ft .mt )
1108
+
1109
+ // Pick more memtables, so we can really fill up the L0 table.
1110
+ slurp ()
1111
+
1112
+ // db.opt.Infof("Picked %d memtables. Size: %d\n", len(itrs), sz)
1113
+ ft .mt = nil
1114
+ ft .itr = table .NewMergeIterator (itrs , false )
1115
+
1082
1116
for {
1083
1117
err := db .handleFlushTask (ft )
1084
1118
if err == nil {
@@ -1089,9 +1123,11 @@ func (db *DB) flushMemtable(lc *z.Closer) error {
1089
1123
// which would arrive here would match db.imm[0], because we acquire a
1090
1124
// lock over DB when pushing to flushChan.
1091
1125
// TODO: This logic is dirty AF. Any change and this could easily break.
1092
- y .AssertTrue (ft .mt == db .imm [0 ])
1093
- db .imm = db .imm [1 :]
1094
- ft .mt .DecrRef () // Return memory.
1126
+ for _ , mt := range mts {
1127
+ y .AssertTrue (mt == db .imm [0 ])
1128
+ db .imm = db .imm [1 :]
1129
+ mt .DecrRef () // Return memory.
1130
+ }
1095
1131
db .lock .Unlock ()
1096
1132
1097
1133
break
@@ -1100,6 +1136,8 @@ func (db *DB) flushMemtable(lc *z.Closer) error {
1100
1136
db .opt .Errorf ("Failure while flushing memtable to disk: %v. Retrying...\n " , err )
1101
1137
time .Sleep (time .Second )
1102
1138
}
1139
+ // Reset everything.
1140
+ itrs , mts , sz = itrs [:0 ], mts [:0 ], 0
1103
1141
}
1104
1142
return nil
1105
1143
}
0 commit comments