From e805a16c7720c86fa29f309fece1c3a53b17e308 Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Sat, 10 Aug 2019 02:00:44 +0530 Subject: [PATCH] Vendor latest badger. (#3784) This change includes the [breaking/format] change dgraph-io/badger@88d5a3c that changes the underlying data format in Badger. --- vendor/github.com/dgraph-io/badger/backup.go | 27 ++++--- vendor/github.com/dgraph-io/badger/batch.go | 9 +++ vendor/github.com/dgraph-io/badger/db.go | 19 ++--- vendor/github.com/dgraph-io/badger/levels.go | 2 +- .../github.com/dgraph-io/badger/managed_db.go | 13 +++ .../github.com/dgraph-io/badger/manifest.go | 8 +- .../dgraph-io/badger/stream_writer.go | 9 +-- vendor/github.com/dgraph-io/badger/structs.go | 15 ++-- .../dgraph-io/badger/table/builder.go | 52 +++++------- .../dgraph-io/badger/table/iterator.go | 80 ++++++++++++------- .../dgraph-io/badger/table/table.go | 31 +++---- vendor/github.com/dgraph-io/badger/value.go | 6 +- vendor/github.com/dgraph-io/badger/y/y.go | 40 ++++++++++ vendor/vendor.json | 34 ++++---- 14 files changed, 212 insertions(+), 133 deletions(-) diff --git a/vendor/github.com/dgraph-io/badger/backup.go b/vendor/github.com/dgraph-io/badger/backup.go index 2569b310050..e4bdc8861ca 100644 --- a/vendor/github.com/dgraph-io/badger/backup.go +++ b/vendor/github.com/dgraph-io/badger/backup.go @@ -129,9 +129,10 @@ func writeTo(list *pb.KVList, w io.Writer) error { // KVLoader is used to write KVList objects in to badger. It can be used to restore a backup. type KVLoader struct { - db *DB - throttle *y.Throttle - entries []*Entry + db *DB + throttle *y.Throttle + entries []*Entry + entriesSize int64 } // NewKVLoader returns a new instance of KVLoader. @@ -139,6 +140,7 @@ func (db *DB) NewKVLoader(maxPendingWrites int) *KVLoader { return &KVLoader{ db: db, throttle: y.NewThrottle(maxPendingWrites), + entries: make([]*Entry, 0, db.opt.maxBatchCount), } } @@ -151,17 +153,23 @@ func (l *KVLoader) Set(kv *pb.KV) error { if len(kv.Meta) > 0 { meta = kv.Meta[0] } - - l.entries = append(l.entries, &Entry{ + e := &Entry{ Key: y.KeyWithTs(kv.Key, kv.Version), Value: kv.Value, UserMeta: userMeta, ExpiresAt: kv.ExpiresAt, meta: meta, - }) - if len(l.entries) >= 1000 { - return l.send() } + estimatedSize := int64(e.estimateSize(l.db.opt.ValueThreshold)) + // Flush entries if inserting the next entry would overflow the transactional limits. + if int64(len(l.entries))+1 >= l.db.opt.maxBatchCount || + l.entriesSize+estimatedSize >= l.db.opt.maxBatchSize { + if err := l.send(); err != nil { + return err + } + } + l.entries = append(l.entries, e) + l.entriesSize += estimatedSize return nil } @@ -175,7 +183,8 @@ func (l *KVLoader) send() error { return err } - l.entries = make([]*Entry, 0, 1000) + l.entries = make([]*Entry, 0, l.db.opt.maxBatchCount) + l.entriesSize = 0 return nil } diff --git a/vendor/github.com/dgraph-io/badger/batch.go b/vendor/github.com/dgraph-io/badger/batch.go index c94e0fed472..76230a0b17f 100644 --- a/vendor/github.com/dgraph-io/badger/batch.go +++ b/vendor/github.com/dgraph-io/badger/batch.go @@ -29,6 +29,7 @@ type WriteBatch struct { db *DB throttle *y.Throttle err error + commitTs uint64 } // NewWriteBatch creates a new WriteBatch. This provides a way to conveniently do a lot of writes, @@ -37,6 +38,13 @@ type WriteBatch struct { // creating and committing transactions. Due to the nature of SSI guaratees provided by Badger, // blind writes can never encounter transaction conflicts (ErrConflict). func (db *DB) NewWriteBatch() *WriteBatch { + if db.opt.managedTxns { + panic("cannot use NewWriteBatch in managed mode. Use NewWriteBatchAt instead") + } + return db.newWriteBatch() +} + +func (db *DB) newWriteBatch() *WriteBatch { return &WriteBatch{ db: db, txn: db.newTransaction(true, true), @@ -136,6 +144,7 @@ func (wb *WriteBatch) commit() error { wb.txn.CommitWith(wb.callback) wb.txn = wb.db.newTransaction(true, true) wb.txn.readTs = 0 // We're not reading anything. + wb.txn.commitTs = wb.commitTs return wb.err } diff --git a/vendor/github.com/dgraph-io/badger/db.go b/vendor/github.com/dgraph-io/badger/db.go index 29460656135..a4ac1ae0a4d 100644 --- a/vendor/github.com/dgraph-io/badger/db.go +++ b/vendor/github.com/dgraph-io/badger/db.go @@ -132,8 +132,7 @@ func (db *DB) replayFunction() func(Entry, valuePointer) error { nv = make([]byte, len(e.Value)) copy(nv, e.Value) } else { - nv = make([]byte, vptrSize) - vp.Encode(nv) + nv = vp.Encode() meta = meta | bitValuePointer } @@ -605,10 +604,9 @@ func (db *DB) writeToLSM(b *request) error { ExpiresAt: entry.ExpiresAt, }) } else { - var offsetBuf [vptrSize]byte db.mt.Put(entry.Key, y.ValueStruct{ - Value: b.Ptrs[i].Encode(offsetBuf[:]), + Value: b.Ptrs[i].Encode(), Meta: entry.meta | bitValuePointer, UserMeta: entry.UserMeta, ExpiresAt: entry.ExpiresAt, @@ -858,9 +856,7 @@ func writeLevel0Table(ft flushTask, f io.Writer, bopts table.Options) error { if len(ft.dropPrefix) > 0 && bytes.HasPrefix(iter.Key(), ft.dropPrefix) { continue } - if err := b.Add(iter.Key(), iter.Value()); err != nil { - return err - } + b.Add(iter.Key(), iter.Value()) } _, err := f.Write(b.Finish()) return err @@ -883,13 +879,12 @@ func (db *DB) handleFlushTask(ft flushTask) error { // Store badger head even if vptr is zero, need it for readTs db.opt.Debugf("Storing value log head: %+v\n", ft.vptr) db.elog.Printf("Storing offset: %+v\n", ft.vptr) - offset := make([]byte, vptrSize) - ft.vptr.Encode(offset) + val := ft.vptr.Encode() // Pick the max commit ts, so in case of crash, our read ts would be higher than all the // commits. headTs := y.KeyWithTs(head, db.orc.nextTs()) - ft.mt.Put(headTs, y.ValueStruct{Value: offset}) + ft.mt.Put(headTs, y.ValueStruct{Value: val}) fileID := db.lc.reserveFileID() fd, err := y.CreateSyncedFile(table.NewFilename(fileID, db.opt.Dir), true) @@ -1013,7 +1008,7 @@ func (db *DB) calculateSize() { if db.opt.ValueDir != db.opt.Dir { _, vlogSize = totalSize(db.opt.ValueDir) } - y.VlogSize.Set(db.opt.Dir, newInt(vlogSize)) + y.VlogSize.Set(db.opt.ValueDir, newInt(vlogSize)) } func (db *DB) updateSize(lc *y.Closer) { @@ -1089,7 +1084,7 @@ func (db *DB) Size() (lsm, vlog int64) { return } lsm = y.LSMSize.Get(db.opt.Dir).(*expvar.Int).Value() - vlog = y.VlogSize.Get(db.opt.Dir).(*expvar.Int).Value() + vlog = y.VlogSize.Get(db.opt.ValueDir).(*expvar.Int).Value() return } diff --git a/vendor/github.com/dgraph-io/badger/levels.go b/vendor/github.com/dgraph-io/badger/levels.go index 9c6a27080ef..05e6d99514f 100644 --- a/vendor/github.com/dgraph-io/badger/levels.go +++ b/vendor/github.com/dgraph-io/badger/levels.go @@ -564,7 +564,7 @@ func (s *levelsController) compactBuildTables( } } numKeys++ - y.Check(builder.Add(it.Key(), it.Value())) + builder.Add(it.Key(), it.Value()) } // It was true that it.Valid() at least once in the loop above, which means we // called Add() at least once, and builder is not Empty(). diff --git a/vendor/github.com/dgraph-io/badger/managed_db.go b/vendor/github.com/dgraph-io/badger/managed_db.go index 4de226ae25f..61e6b3cc281 100644 --- a/vendor/github.com/dgraph-io/badger/managed_db.go +++ b/vendor/github.com/dgraph-io/badger/managed_db.go @@ -40,6 +40,19 @@ func (db *DB) NewTransactionAt(readTs uint64, update bool) *Txn { return txn } +// NewWriteBatchAt is similar to NewWriteBatch but it allows user to set the commit timestamp. +// NewWriteBatchAt is supposed to be used only in the managed mode. +func (db *DB) NewWriteBatchAt(commitTs uint64) *WriteBatch { + if !db.opt.managedTxns { + panic("cannot use NewWriteBatchAt with managedDB=false. Use NewWriteBatch instead") + } + + wb := db.newWriteBatch() + wb.commitTs = commitTs + wb.txn.commitTs = commitTs + return wb +} + // CommitAt commits the transaction, following the same logic as Commit(), but // at the given commit timestamp. This will panic if not used with managed transactions. // diff --git a/vendor/github.com/dgraph-io/badger/manifest.go b/vendor/github.com/dgraph-io/badger/manifest.go index b4a346f2b42..f854966e7b0 100644 --- a/vendor/github.com/dgraph-io/badger/manifest.go +++ b/vendor/github.com/dgraph-io/badger/manifest.go @@ -223,7 +223,7 @@ func (mf *manifestFile) addChanges(changesParam []*pb.ManifestChange) error { var magicText = [4]byte{'B', 'd', 'g', 'r'} // The magic version number. -const magicVersion = 6 +const magicVersion = 7 func helpRewrite(dir string, m *Manifest) (*os.File, int, error) { rewritePath := filepath.Join(dir, manifestRewriteFilename) @@ -340,7 +340,7 @@ func ReplayManifestFile(fp *os.File) (ret Manifest, truncOffset int64, err error if !bytes.Equal(magicBuf[0:4], magicText[:]) { return Manifest{}, 0, errBadMagic } - version := binary.BigEndian.Uint32(magicBuf[4:8]) + version := y.BytesToU32(magicBuf[4:8]) if version != magicVersion { return Manifest{}, 0, fmt.Errorf("manifest has unsupported version: %d (we support %d)", version, magicVersion) @@ -358,7 +358,7 @@ func ReplayManifestFile(fp *os.File) (ret Manifest, truncOffset int64, err error } return Manifest{}, 0, err } - length := binary.BigEndian.Uint32(lenCrcBuf[0:4]) + length := y.BytesToU32(lenCrcBuf[0:4]) var buf = make([]byte, length) if _, err := io.ReadFull(&r, buf); err != nil { if err == io.EOF || err == io.ErrUnexpectedEOF { @@ -366,7 +366,7 @@ func ReplayManifestFile(fp *os.File) (ret Manifest, truncOffset int64, err error } return Manifest{}, 0, err } - if crc32.Checksum(buf, y.CastagnoliCrcTable) != binary.BigEndian.Uint32(lenCrcBuf[4:8]) { + if crc32.Checksum(buf, y.CastagnoliCrcTable) != y.BytesToU32(lenCrcBuf[4:8]) { return Manifest{}, 0, errBadChecksum } diff --git a/vendor/github.com/dgraph-io/badger/stream_writer.go b/vendor/github.com/dgraph-io/badger/stream_writer.go index cff0a97e933..f0c193ce0fd 100644 --- a/vendor/github.com/dgraph-io/badger/stream_writer.go +++ b/vendor/github.com/dgraph-io/badger/stream_writer.go @@ -156,8 +156,7 @@ func (sw *StreamWriter) Flush() error { } // Encode and write the value log head into a new table. - data := make([]byte, vptrSize) - maxHead.Encode(data) + data := maxHead.Encode() headWriter := sw.newWriter(headStreamId) if err := headWriter.Add( y.KeyWithTs(head, sw.maxVersion), @@ -247,9 +246,8 @@ func (w *sortedWriter) handleRequests(closer *y.Closer) { ExpiresAt: e.ExpiresAt, } } else { - vbuf := make([]byte, vptrSize) vs = y.ValueStruct{ - Value: vptr.Encode(vbuf), + Value: vptr.Encode(), Meta: e.meta | bitValuePointer, UserMeta: e.UserMeta, ExpiresAt: e.ExpiresAt, @@ -290,7 +288,8 @@ func (w *sortedWriter) Add(key []byte, vs y.ValueStruct) error { } w.lastKey = y.SafeCopy(w.lastKey, key) - return w.builder.Add(key, vs) + w.builder.Add(key, vs) + return nil } func (w *sortedWriter) send() error { diff --git a/vendor/github.com/dgraph-io/badger/structs.go b/vendor/github.com/dgraph-io/badger/structs.go index 0108fded077..14456270efc 100644 --- a/vendor/github.com/dgraph-io/badger/structs.go +++ b/vendor/github.com/dgraph-io/badger/structs.go @@ -6,6 +6,7 @@ import ( "fmt" "hash/crc32" "time" + "unsafe" "github.com/dgraph-io/badger/y" ) @@ -33,18 +34,16 @@ func (p valuePointer) IsZero() bool { const vptrSize = 12 // Encode encodes Pointer into byte buffer. -func (p valuePointer) Encode(b []byte) []byte { - binary.BigEndian.PutUint32(b[:4], p.Fid) - binary.BigEndian.PutUint32(b[4:8], p.Len) - binary.BigEndian.PutUint32(b[8:12], p.Offset) - return b[:vptrSize] +func (p valuePointer) Encode() []byte { + b := make([]byte, vptrSize) + // Copy over the content from p to b. + *(*valuePointer)(unsafe.Pointer(&b[0])) = p + return b } // Decode decodes the value pointer into the provided byte buffer. func (p *valuePointer) Decode(b []byte) { - p.Fid = binary.BigEndian.Uint32(b[:4]) - p.Len = binary.BigEndian.Uint32(b[4:8]) - p.Offset = binary.BigEndian.Uint32(b[8:12]) + *p = *(*valuePointer)(unsafe.Pointer(&b[0])) } // header is used in value log as a header before Entry. diff --git a/vendor/github.com/dgraph-io/badger/table/builder.go b/vendor/github.com/dgraph-io/badger/table/builder.go index 8719c04d1b8..32521959084 100644 --- a/vendor/github.com/dgraph-io/badger/table/builder.go +++ b/vendor/github.com/dgraph-io/badger/table/builder.go @@ -18,8 +18,8 @@ package table import ( "bytes" - "encoding/binary" "math" + "unsafe" "github.com/dgryski/go-farm" @@ -37,26 +37,23 @@ func newBuffer(sz int) *bytes.Buffer { type header struct { plen uint16 // Overlap with base key. klen uint16 // Length of the diff. - vlen uint32 // Length of value. } // Encode encodes the header. -func (h header) Encode(b []byte) { - binary.BigEndian.PutUint16(b[0:2], h.plen) - binary.BigEndian.PutUint16(b[2:4], h.klen) - binary.BigEndian.PutUint32(b[4:8], h.vlen) +func (h header) Encode() []byte { + var b [4]byte + *(*header)(unsafe.Pointer(&b[0])) = h + return b[:] } // Decode decodes the header. func (h *header) Decode(buf []byte) int { - h.plen = binary.BigEndian.Uint16(buf[0:2]) - h.klen = binary.BigEndian.Uint16(buf[2:4]) - h.vlen = binary.BigEndian.Uint32(buf[4:8]) + *h = *(*header)(unsafe.Pointer(&buf[0])) return h.Size() } // Size returns size of the header. Currently it's just a constant. -func (h header) Size() int { return 8 } +func (h header) Size() int { return 4 } // Builder is used in building a table. type Builder struct { @@ -117,17 +114,14 @@ func (b *Builder) addHelper(key []byte, v y.ValueStruct) { h := header{ plen: uint16(len(key) - len(diffKey)), klen: uint16(len(diffKey)), - vlen: uint32(v.EncodedSize()), } // store current entry's offset - y.AssertTrue(b.buf.Len() < math.MaxUint32) + y.AssertTrue(uint32(b.buf.Len()) < math.MaxUint32) b.entryOffsets = append(b.entryOffsets, uint32(b.buf.Len())-b.baseOffset) // Layout: header, diffKey, value. - var hbuf [8]byte - h.Encode(hbuf[:]) - b.buf.Write(hbuf[:]) + b.buf.Write(h.Encode()) b.buf.Write(diffKey) // We only need to store the key difference. v.EncodeTo(b.buf) @@ -145,12 +139,8 @@ Structure of Block. +-----------------------------------------+--------------------+--------------+------------------+ */ func (b *Builder) finishBlock() { - ebuf := make([]byte, len(b.entryOffsets)*4+4) - for i, offset := range b.entryOffsets { - binary.BigEndian.PutUint32(ebuf[4*i:4*i+4], uint32(offset)) - } - binary.BigEndian.PutUint32(ebuf[len(ebuf)-4:], uint32(len(b.entryOffsets))) - b.buf.Write(ebuf) + b.buf.Write(y.U32SliceToBytes(b.entryOffsets)) + b.buf.Write(y.U32ToBytes(uint32(len(b.entryOffsets)))) blockBuf := b.buf.Bytes()[b.baseOffset:] // Store checksum for current block. b.writeChecksum(blockBuf) @@ -173,7 +163,8 @@ func (b *Builder) shouldFinishBlock(key []byte, value y.ValueStruct) bool { return false } - y.AssertTrue((len(b.entryOffsets)+1)*4+4+8+4 < math.MaxUint32) // check for below statements + // Integer overflow check for statements below. + y.AssertTrue((uint32(len(b.entryOffsets))+1)*4+4+8+4 < math.MaxUint32) // We should include current entry also in size, that's why +1 to len(b.entryOffsets). entriesOffsetsSize := uint32((len(b.entryOffsets)+1)*4 + 4 + // size of list @@ -186,17 +177,16 @@ func (b *Builder) shouldFinishBlock(key []byte, value y.ValueStruct) bool { } // Add adds a key-value pair to the block. -func (b *Builder) Add(key []byte, value y.ValueStruct) error { +func (b *Builder) Add(key []byte, value y.ValueStruct) { if b.shouldFinishBlock(key, value) { b.finishBlock() // Start a new block. Initialize the block. b.baseKey = []byte{} - y.AssertTrue(b.buf.Len() < math.MaxUint32) + y.AssertTrue(uint32(b.buf.Len()) < math.MaxUint32) b.baseOffset = uint32(b.buf.Len()) b.entryOffsets = b.entryOffsets[:0] } b.addHelper(key, value) - return nil // Currently, there is no meaningful error. } // TODO: vvv this was the comment on ReachedCapacity. @@ -246,11 +236,9 @@ func (b *Builder) Finish() []byte { n, err := b.buf.Write(index) y.Check(err) - y.AssertTrue(n < math.MaxUint32) + y.AssertTrue(uint32(n) < math.MaxUint32) // Write index size. - var buf [4]byte - binary.BigEndian.PutUint32(buf[:], uint32(n)) - _, err = b.buf.Write(buf[:]) + _, err = b.buf.Write(y.U32ToBytes(uint32(n))) y.Check(err) b.writeChecksum(index) @@ -278,10 +266,8 @@ func (b *Builder) writeChecksum(data []byte) { n, err := b.buf.Write(chksum) y.Check(err) - y.AssertTrue(n < math.MaxUint32) + y.AssertTrue(uint32(n) < math.MaxUint32) // Write checksum size. - var buf [4]byte - binary.BigEndian.PutUint32(buf[:], uint32(n)) - _, err = b.buf.Write(buf[:]) + _, err = b.buf.Write(y.U32ToBytes(uint32(n))) y.Check(err) } diff --git a/vendor/github.com/dgraph-io/badger/table/iterator.go b/vendor/github.com/dgraph-io/badger/table/iterator.go index c1df37ad8f9..22f0f707090 100644 --- a/vendor/github.com/dgraph-io/badger/table/iterator.go +++ b/vendor/github.com/dgraph-io/badger/table/iterator.go @@ -18,7 +18,6 @@ package table import ( "bytes" - "encoding/binary" "io" "sort" @@ -32,6 +31,7 @@ type blockIterator struct { err error baseKey []byte numEntries int + entryOffsets []uint32 entriesIndexStart int currentIdx int @@ -43,13 +43,25 @@ type blockIterator struct { func (itr *blockIterator) Reset() { itr.pos = 0 itr.err = nil - itr.baseKey = []byte{} - itr.key = []byte{} - itr.val = []byte{} + itr.baseKey = itr.baseKey[:0] + itr.key = itr.key[:0] + itr.val = itr.val[:0] itr.init = false itr.currentIdx = -1 } +// invalidatePointer detaches block iterator from current block. +func (itr *blockIterator) invalidatePointer() { + itr.data = nil + itr.numEntries = -1 + itr.entriesIndexStart = -1 +} + +// isInvalidPointer returns if block iterator is attachted with any block or not. +func (itr *blockIterator) isInvalidPointer() bool { + return itr.data == nil && itr.numEntries == -1 && itr.entriesIndexStart == -1 +} + func (itr *blockIterator) Init() { if !itr.init { itr.currentIdx = -1 @@ -73,15 +85,10 @@ var ( current = 1 ) -func (itr *blockIterator) getOffset(idx int) uint32 { - y.AssertTrue(idx >= 0 && idx < itr.numEntries) - return binary.BigEndian.Uint32(itr.data[itr.entriesIndexStart+4*idx:]) -} - func (itr *blockIterator) getKey(idx int) []byte { y.AssertTrue(idx >= 0 && idx < itr.numEntries) - idxPos := itr.getOffset(idx) + idxPos := itr.entryOffsets[idx] var h header idxPos += uint32(h.Decode(itr.data[idxPos:])) @@ -128,7 +135,7 @@ func (itr *blockIterator) Seek(key []byte, whence int) { // Found first idx for which key is >= key to be sought. itr.currentIdx = idx - itr.pos = itr.getOffset(itr.currentIdx) + itr.pos = itr.entryOffsets[itr.currentIdx] var h header itr.pos += uint32(h.Decode(itr.data[itr.pos:])) itr.parseKV(h) @@ -159,13 +166,24 @@ func (itr *blockIterator) parseKV(h header) { copy(itr.key[h.plen:], itr.data[itr.pos:itr.pos+uint32(h.klen)]) itr.pos += uint32(h.klen) - if itr.pos+uint32(h.vlen) > uint32(len(itr.data)) { - itr.err = errors.Errorf("Value exceeded size of block: %d %d %d %d %v", - itr.pos, h.klen, h.vlen, len(itr.data), h) + var valEndOffset uint32 + // We're at the last entry in the block. + if itr.currentIdx == itr.numEntries-1 { + valEndOffset = uint32(itr.entriesIndexStart) + } else { + // Get starting offset of the next entry which is the end of the current entry. + valEndOffset = itr.entryOffsets[itr.currentIdx+1] + } + + if valEndOffset > uint32(len(itr.data)) { + itr.err = errors.Errorf("Value endoffset exceeded size of block. "+ + "Pos:%d Len:%d EndOffset:%d Header:%v", itr.pos, len(itr.data), valEndOffset, h) return } - itr.val = y.SafeCopy(itr.val, itr.data[itr.pos:itr.pos+uint32(h.vlen)]) - itr.pos += uint32(h.vlen) + // TODO (ibrahim): Can we avoid this copy? + itr.val = y.SafeCopy(itr.val, itr.data[itr.pos:valEndOffset]) + // Set pos to the end of current entry. + itr.pos = valEndOffset } func (itr *blockIterator) Next() { @@ -203,7 +221,7 @@ func (itr *blockIterator) Prev() { return } - itr.pos = itr.getOffset(itr.currentIdx) + itr.pos = itr.entryOffsets[itr.currentIdx] var h header y.AssertTruef(itr.pos < uint32(len(itr.data)), "%d %d", itr.pos, len(itr.data)) @@ -240,7 +258,9 @@ type Iterator struct { // NewIterator returns a new iterator of the Table func (t *Table) NewIterator(reversed bool) *Iterator { t.IncrRef() // Important. - ti := &Iterator{t: t, reversed: reversed} + bi := &blockIterator{} + bi.invalidatePointer() + ti := &Iterator{t: t, reversed: reversed, bi: bi} ti.next() return ti } @@ -252,6 +272,7 @@ func (itr *Iterator) Close() error { func (itr *Iterator) reset() { itr.bpos = 0 + itr.bi.invalidatePointer() itr.err = nil } @@ -272,7 +293,8 @@ func (itr *Iterator) seekToFirst() { itr.err = err return } - itr.bi = block.NewIterator() + + block.resetIterator(itr.bi) itr.bi.SeekToFirst() itr.err = itr.bi.Error() } @@ -289,7 +311,8 @@ func (itr *Iterator) seekToLast() { itr.err = err return } - itr.bi = block.NewIterator() + + block.resetIterator(itr.bi) itr.bi.SeekToLast() itr.err = itr.bi.Error() } @@ -301,7 +324,8 @@ func (itr *Iterator) seekHelper(blockIdx int, key []byte) { itr.err = err return } - itr.bi = block.NewIterator() + + block.resetIterator(itr.bi) itr.bi.Seek(key, origin) itr.err = itr.bi.Error() } @@ -368,13 +392,14 @@ func (itr *Iterator) next() { return } - if itr.bi == nil { + if itr.bi.isInvalidPointer() { block, err := itr.t.block(itr.bpos) if err != nil { itr.err = err return } - itr.bi = block.NewIterator() + + block.resetIterator(itr.bi) itr.bi.SeekToFirst() itr.err = itr.bi.Error() return @@ -382,8 +407,8 @@ func (itr *Iterator) next() { itr.bi.Next() if !itr.bi.Valid() { + itr.bi.invalidatePointer() itr.bpos++ - itr.bi = nil itr.next() return } @@ -396,13 +421,14 @@ func (itr *Iterator) prev() { return } - if itr.bi == nil { + if itr.bi.isInvalidPointer() { block, err := itr.t.block(itr.bpos) if err != nil { itr.err = err return } - itr.bi = block.NewIterator() + + block.resetIterator(itr.bi) itr.bi.SeekToLast() itr.err = itr.bi.Error() return @@ -410,8 +436,8 @@ func (itr *Iterator) prev() { itr.bi.Prev() if !itr.bi.Valid() { + itr.bi.invalidatePointer() itr.bpos-- - itr.bi = nil itr.prev() return } diff --git a/vendor/github.com/dgraph-io/badger/table/table.go b/vendor/github.com/dgraph-io/badger/table/table.go index 48e7cff4f17..8e1a2dc1a55 100644 --- a/vendor/github.com/dgraph-io/badger/table/table.go +++ b/vendor/github.com/dgraph-io/badger/table/table.go @@ -17,7 +17,6 @@ package table import ( - "encoding/binary" "fmt" "io" "os" @@ -125,10 +124,11 @@ type block struct { data []byte numEntries int // number of entries present in the block entriesIndexStart int // start index of entryOffsets list + entryOffsets []uint32 chkLen int // checksum length } -func (b block) verifyCheckSum() error { +func (b *block) verifyCheckSum() error { readPos := len(b.data) - 4 - b.chkLen if readPos < 0 { // This should be rare, hence can create a error instead of having global error. @@ -143,14 +143,13 @@ func (b block) verifyCheckSum() error { return y.VerifyChecksum(b.data[:readPos], cs) } -func (b block) NewIterator() *blockIterator { - bi := &blockIterator{ - data: b.data, - numEntries: b.numEntries, - entriesIndexStart: b.entriesIndexStart, - } +func (b *block) resetIterator(bi *blockIterator) { + bi.Reset() - return bi + bi.data = b.data + bi.numEntries = b.numEntries + bi.entryOffsets = b.entryOffsets + bi.entriesIndexStart = b.entriesIndexStart } // OpenTable assumes file has only one table and opens it. Takes ownership of fd upon function @@ -275,7 +274,7 @@ func (t *Table) readIndex() error { // Read checksum len from the last 4 bytes. readPos -= 4 buf := t.readNoFail(readPos, 4) - checksumLen := int(binary.BigEndian.Uint32(buf)) + checksumLen := int(y.BytesToU32(buf)) // Read checksum. expectedChk := &pb.Checksum{} @@ -288,7 +287,7 @@ func (t *Table) readIndex() error { // Read index size from the footer. readPos -= 4 buf = t.readNoFail(readPos, 4) - indexLen := int(binary.BigEndian.Uint32(buf)) + indexLen := int(y.BytesToU32(buf)) // Read index. readPos -= indexLen data := t.readNoFail(readPos, indexLen) @@ -321,13 +320,17 @@ func (t *Table) block(idx int) (*block, error) { // Read meta data related to block. readPos := len(blk.data) - 4 // First read checksum length. - blk.chkLen = int(binary.BigEndian.Uint32(blk.data[readPos : readPos+4])) + blk.chkLen = int(y.BytesToU32(blk.data[readPos : readPos+4])) // Skip reading checksum, and move position to read numEntries in block. readPos -= (blk.chkLen + 4) - blk.numEntries = int(binary.BigEndian.Uint32(blk.data[readPos : readPos+4])) - blk.entriesIndexStart = readPos - (blk.numEntries * 4) + blk.numEntries = int(y.BytesToU32(blk.data[readPos : readPos+4])) + entriesIndexStart := readPos - (blk.numEntries * 4) + entriesIndexEnd := entriesIndexStart + blk.numEntries*4 + + blk.entryOffsets = y.BytesToU32Slice(blk.data[entriesIndexStart:entriesIndexEnd]) + blk.entriesIndexStart = entriesIndexStart // Verify checksum on if checksum verification mode is OnRead on OnStartAndRead. if t.opt.ChkMode == options.OnBlockRead || t.opt.ChkMode == options.OnTableAndBlockRead { if err = blk.verifyCheckSum(); err != nil { diff --git a/vendor/github.com/dgraph-io/badger/value.go b/vendor/github.com/dgraph-io/badger/value.go index aa12ec2d1ce..226c7e316c2 100644 --- a/vendor/github.com/dgraph-io/badger/value.go +++ b/vendor/github.com/dgraph-io/badger/value.go @@ -19,7 +19,6 @@ package badger import ( "bufio" "bytes" - "encoding/binary" "encoding/json" "fmt" "hash" @@ -244,7 +243,7 @@ func (r *safeRead) Entry(reader io.Reader) (*Entry, error) { } return nil, err } - crc := binary.BigEndian.Uint32(crcBuf[:]) + crc := y.BytesToU32(crcBuf[:]) if crc != tee.Sum32() { return nil, errTruncate } @@ -1384,11 +1383,12 @@ func (vlog *valueLog) runGC(discardRatio float64, head valuePointer) error { func (vlog *valueLog) updateDiscardStats(stats map[uint32]int64) error { vlog.lfDiscardStats.Lock() + defer vlog.lfDiscardStats.Unlock() + for fid, sz := range stats { vlog.lfDiscardStats.m[fid] += sz vlog.lfDiscardStats.updatesSinceFlush++ } - vlog.lfDiscardStats.Unlock() if vlog.lfDiscardStats.updatesSinceFlush > discardStatsFlushThreshold { if err := vlog.flushDiscardStats(); err != nil { return err diff --git a/vendor/github.com/dgraph-io/badger/y/y.go b/vendor/github.com/dgraph-io/badger/y/y.go index 4948315a9ae..87facc42d17 100644 --- a/vendor/github.com/dgraph-io/badger/y/y.go +++ b/vendor/github.com/dgraph-io/badger/y/y.go @@ -23,8 +23,10 @@ import ( "hash/crc32" "math" "os" + "reflect" "sync" "time" + "unsafe" "github.com/pkg/errors" ) @@ -300,3 +302,41 @@ func (t *Throttle) Finish() error { return t.finishErr } + +// U32ToBytes converts the given Uint32 to bytes +func U32ToBytes(v uint32) []byte { + var uBuf [4]byte + binary.BigEndian.PutUint32(uBuf[:], v) + return uBuf[:] +} + +// BytesToU32 converts the given byte slice to uint32 +func BytesToU32(b []byte) uint32 { + return binary.BigEndian.Uint32(b) +} + +// U32SliceToBytes converts the given Uint32 slice to byte slice +func U32SliceToBytes(u32s []uint32) []byte { + if len(u32s) == 0 { + return nil + } + var b []byte + hdr := (*reflect.SliceHeader)(unsafe.Pointer(&b)) + hdr.Len = len(u32s) * 4 + hdr.Cap = hdr.Len + hdr.Data = uintptr(unsafe.Pointer(&u32s[0])) + return b +} + +// BytesToU32Slice converts the given byte slice to uint32 slice +func BytesToU32Slice(b []byte) []uint32 { + if len(b) == 0 { + return nil + } + var u32s []uint32 + hdr := (*reflect.SliceHeader)(unsafe.Pointer(&u32s)) + hdr.Len = len(b) / 4 + hdr.Cap = hdr.Len + hdr.Data = uintptr(unsafe.Pointer(&b[0])) + return u32s +} diff --git a/vendor/vendor.json b/vendor/vendor.json index 1a9bccab729..a0fa877aba6 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -465,46 +465,46 @@ "revisionTime": "2016-09-07T16:21:46Z" }, { - "checksumSHA1": "jN1Hm75ZjUqHxpx/b9nIKxdR0Wg=", + "checksumSHA1": "4JtWm1bK1Xyw1Iz1Ir3Dq38Ce9o=", "path": "github.com/dgraph-io/badger", - "revision": "f59246cb119d2f7b6f2fc29a835ee5edb366019a", - "revisionTime": "2019-08-02T17:43:07Z" + "revision": "9d7b751e85c981d34e6351520919f5c0547c1379", + "revisionTime": "2019-08-09T12:18:31Z" }, { "checksumSHA1": "On87ObyZ4RLZWWmDtW2M5gY997I=", "path": "github.com/dgraph-io/badger/options", - "revision": "f59246cb119d2f7b6f2fc29a835ee5edb366019a", - "revisionTime": "2019-08-02T17:43:07Z" + "revision": "9d7b751e85c981d34e6351520919f5c0547c1379", + "revisionTime": "2019-08-09T12:18:31Z" }, { "checksumSHA1": "SHnr7a5ZSRsyrAT8Nw1AxDs1UXw=", "path": "github.com/dgraph-io/badger/pb", - "revision": "f59246cb119d2f7b6f2fc29a835ee5edb366019a", - "revisionTime": "2019-08-02T17:43:07Z" + "revision": "9d7b751e85c981d34e6351520919f5c0547c1379", + "revisionTime": "2019-08-09T12:18:31Z" }, { "checksumSHA1": "zWLyKFwJcRNWzpvE+QDrsqi9UlE=", "path": "github.com/dgraph-io/badger/skl", - "revision": "f59246cb119d2f7b6f2fc29a835ee5edb366019a", - "revisionTime": "2019-08-02T17:43:07Z" + "revision": "9d7b751e85c981d34e6351520919f5c0547c1379", + "revisionTime": "2019-08-09T12:18:31Z" }, { - "checksumSHA1": "poz2/1kTpQvUf+6jSr3vjahkqtY=", + "checksumSHA1": "/hYHtgAA2VVZ+4sdbplyOLGRiEM=", "path": "github.com/dgraph-io/badger/table", - "revision": "f59246cb119d2f7b6f2fc29a835ee5edb366019a", - "revisionTime": "2019-08-02T17:43:07Z" + "revision": "9d7b751e85c981d34e6351520919f5c0547c1379", + "revisionTime": "2019-08-09T12:18:31Z" }, { "checksumSHA1": "JxUxrgSrNNTqbX3tqzh3dQIG+uU=", "path": "github.com/dgraph-io/badger/trie", - "revision": "f59246cb119d2f7b6f2fc29a835ee5edb366019a", - "revisionTime": "2019-08-02T17:43:07Z" + "revision": "9d7b751e85c981d34e6351520919f5c0547c1379", + "revisionTime": "2019-08-09T12:18:31Z" }, { - "checksumSHA1": "HxjNg9sBt3V1Fcmtau1IFZL2OcI=", + "checksumSHA1": "pfK0K0RIzPJ4NZ+B27kgq6as/ew=", "path": "github.com/dgraph-io/badger/y", - "revision": "f59246cb119d2f7b6f2fc29a835ee5edb366019a", - "revisionTime": "2019-08-02T17:43:07Z" + "revision": "9d7b751e85c981d34e6351520919f5c0547c1379", + "revisionTime": "2019-08-09T12:18:31Z" }, { "checksumSHA1": "kKrPMMzl8wnZ8Ml1WGHcNO2X0cI=",