From f6f3b5a3725178e173350cfdbe4aa8576477a6cc Mon Sep 17 00:00:00 2001 From: Daniel Mai Date: Thu, 24 Oct 2019 11:20:21 -0700 Subject: [PATCH] release/v1.0: Vendor in Badger with only the discard stats fix. (#4212) This vendors in Badger v1.6.0 with dgraph-io/badger#929 cherry-picked into it (dgraph-io/badger#1098) to avoid any breaking changes to the data format. Vendor in Badger to fix a bug that prevents Alpha from starting up successfully. This happens in v1.0.16 and v1.0.17. Error while creating badger KV posting store error: Unable to find log file. Please retry The release/v1.0 branch uses the vendor directory for dependencies. Badger was pulled in with the following command: govendor fetch github.com/dgraph-io/badger/...@efb9d9d15d7f7baa656e04933058f006c33a8d0f --- .../github.com/dgraph-io/badger/CHANGELOG.md | 13 +- vendor/github.com/dgraph-io/badger/README.md | 38 +- .../github.com/dgraph-io/badger/VERSIONING.md | 47 -- vendor/github.com/dgraph-io/badger/backup.go | 64 +- .../github.com/dgraph-io/badger/compaction.go | 5 +- vendor/github.com/dgraph-io/badger/db.go | 145 +--- vendor/github.com/dgraph-io/badger/errors.go | 8 +- vendor/github.com/dgraph-io/badger/go.sum | 1 - vendor/github.com/dgraph-io/badger/levels.go | 128 +-- .../github.com/dgraph-io/badger/manifest.go | 16 +- vendor/github.com/dgraph-io/badger/options.go | 57 +- .../dgraph-io/badger/options/options.go | 15 - .../github.com/dgraph-io/badger/pb/pb.pb.go | 743 +----------------- .../github.com/dgraph-io/badger/pb/pb.proto | 21 +- .../github.com/dgraph-io/badger/publisher.go | 64 +- .../github.com/dgraph-io/badger/skl/arena.go | 4 +- vendor/github.com/dgraph-io/badger/skl/skl.go | 10 +- .../dgraph-io/badger/stream_writer.go | 9 +- .../dgraph-io/badger/table/README.md | 87 +- .../dgraph-io/badger/table/builder.go | 191 ++--- .../dgraph-io/badger/table/iterator.go | 103 +-- .../dgraph-io/badger/table/table.go | 226 +++--- .../github.com/dgraph-io/badger/trie/trie.go | 97 --- vendor/github.com/dgraph-io/badger/value.go | 134 ++-- .../github.com/dgraph-io/badger/y/checksum.go | 50 -- .../github.com/dgraph-io/badger/y/iterator.go | 6 +- .../dgraph-io/badger/y/watermark.go | 32 +- vendor/vendor.json | 42 +- 28 files changed, 608 insertions(+), 1748 deletions(-) delete mode 100644 vendor/github.com/dgraph-io/badger/VERSIONING.md delete mode 100644 vendor/github.com/dgraph-io/badger/trie/trie.go delete mode 100644 vendor/github.com/dgraph-io/badger/y/checksum.go diff --git a/vendor/github.com/dgraph-io/badger/CHANGELOG.md b/vendor/github.com/dgraph-io/badger/CHANGELOG.md index e381a4b7c9b..123d0f572fc 100644 --- a/vendor/github.com/dgraph-io/badger/CHANGELOG.md +++ b/vendor/github.com/dgraph-io/badger/CHANGELOG.md @@ -2,17 +2,14 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/) -and this project adheres to [Serialization Versioning](VERSIONING.md). +and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html). ## [Unreleased] -## [1.6.0] - 2019-07-01 +## [2.0.0] - unreleased This is a release including almost 200 commits, so expect many changes - some of them -not backward compatible. - -Regarding backward compatibility in Badger versions, you might be interested on reading -[VERSIONING.md](VERSIONING.md). +not backwards compatible. _Note_: The hashes in parentheses correspond to the commits that impacted the given feature. @@ -175,8 +172,8 @@ Bug fix: ## [1.0.1] - 2017-11-06 * Fix an uint16 overflow when resizing key slice -[Unreleased]: https://github.com/dgraph-io/badger/compare/v1.6.0...HEAD -[1.6.0]: https://github.com/dgraph-io/badger/compare/v1.5.5...v1.6.0 +[Unreleased]: https://github.com/dgraph-io/badger/compare/v2.0-rc1...HEAD +[2.0-rc1]: https://github.com/dgraph-io/badger/compare/v1.5.5...v2.0-rc1 [1.5.5]: https://github.com/dgraph-io/badger/compare/v1.5.3...v1.5.5 [1.5.3]: https://github.com/dgraph-io/badger/compare/v1.5.2...v1.5.3 [1.5.2]: https://github.com/dgraph-io/badger/compare/v1.5.1...v1.5.2 diff --git a/vendor/github.com/dgraph-io/badger/README.md b/vendor/github.com/dgraph-io/badger/README.md index fe033d9cb58..5779cb6bd90 100644 --- a/vendor/github.com/dgraph-io/badger/README.md +++ b/vendor/github.com/dgraph-io/badger/README.md @@ -6,7 +6,7 @@ BadgerDB is an embeddable, persistent and fast key-value (KV) database written in pure Go. It's meant to be a performant alternative to non-Go-based key-value stores like [RocksDB](https://github.com/facebook/rocksdb). -## Project Status [Jun 26, 2019] +## Project Status [Oct 27, 2018] Badger is stable and is being used to serve data sets worth hundreds of terabytes. Badger supports concurrent ACID transactions with serializable @@ -15,20 +15,14 @@ snapshot isolation (SSI) guarantees. A Jepsen-style bank test runs nightly for Badger has also been tested to work with filesystem level anomalies, to ensure persistence and consistency. -Badger v1.0 was released in Nov 2017, and the latest version that is data-compatible -with v1.0 is v1.6.0. - -Badger v2.0, a new release coming up very soon will use a new storage format which won't -be compatible with all of the v1.x. The [Changelog] is kept fairly up-to-date. - -For more details on our version naming schema please read [Choosing a version](#choosing-a-version). +Badger v1.0 was released in Nov 2017, with a Badger v2.0 release coming up in a +few months. The [Changelog] is kept fairly up-to-date. [Changelog]:https://github.com/dgraph-io/badger/blob/master/CHANGELOG.md ## Table of Contents * [Getting Started](#getting-started) + [Installing](#installing) - - [Choosing a version](#choosing-a-version) + [Opening a database](#opening-a-database) + [Transactions](#transactions) - [Read-only transactions](#read-only-transactions) @@ -67,27 +61,6 @@ $ go get github.com/dgraph-io/badger/... This will retrieve the library and install the `badger` command line utility into your `$GOBIN` path. -#### Choosing a version - -BadgerDB is a pretty special package from the point of view that the most important change we can -make to it is not on its API but rather on how data is stored on disk. - -This is why we follow a version naming schema that differs from Semantic Versioning. - -- New major versions are released when the data format on disk changes in an incompatible way. -- New minor versions are released whenever the API changes but data compatibility is maintained. - Note that the changes on the API could be backward-incompatible - unlike Semantic Versioning. -- New patch versions are released when there's no changes to the data format nor the API. - -Following these rules: - -- v1.5.0 and v1.6.0 can be used on top of the same files without any concerns, as their major - version is the same, therefore the data format on disk is compatible. -- v1.6.0 and v2.0.0 are data incompatible as their major version implies, so files created with - v1.6.0 will need to be converted into the new format before they can be used by v2.0.0. - -For a longer explanation on the reasons behind using a new versioning naming schema, you can read -[VERSIONING.md](VERSIONING.md). ### Opening a database The top-level object in Badger is a `DB`. It represents multiple files on disk @@ -109,7 +82,10 @@ import ( func main() { // Open the Badger database located in the /tmp/badger directory. // It will be created if it doesn't exist. - db, err := badger.Open(badger.DefaultOptions("tmp/badger")) + opts := badger.DefaultOptions + opts.Dir = "/tmp/badger" + opts.ValueDir = "/tmp/badger" + db, err := badger.Open(opts) if err != nil { log.Fatal(err) } diff --git a/vendor/github.com/dgraph-io/badger/VERSIONING.md b/vendor/github.com/dgraph-io/badger/VERSIONING.md deleted file mode 100644 index a890a36ffb3..00000000000 --- a/vendor/github.com/dgraph-io/badger/VERSIONING.md +++ /dev/null @@ -1,47 +0,0 @@ -# Serialization Versioning: Semantic Versioning for databases - -Semantic Versioning, commonly known as SemVer, is a great idea that has been very widely adopted as -a way to decide how to name software versions. The whole concept is very well summarized on -semver.org with the following lines: - -> Given a version number MAJOR.MINOR.PATCH, increment the: -> -> 1. MAJOR version when you make incompatible API changes, -> 2. MINOR version when you add functionality in a backwards-compatible manner, and -> 3. PATCH version when you make backwards-compatible bug fixes. -> -> Additional labels for pre-release and build metadata are available as extensions to the -> MAJOR.MINOR.PATCH format. - -Unfortunately, API changes are not the most important changes for libraries that serialize data for -later consumption. For these libraries, such as BadgerDB, changes to the API are much easier to -handle than change to the data format used to store data on disk. - -## Serialization Version specification - -Serialization Versioning, like Semantic Versioning, uses 3 numbers and also calls them -MAJOR.MINOR.PATCH, but the semantics of the numbers are slightly modified: - -Given a version number MAJOR.MINOR.PATCH, increment the: - -- MAJOR version when you make changes that require a transformation of the dataset before it can be -used again. -- MINOR version when old datasets are still readable but the API might have changed in -backwards-compatible or incompatible ways. -- PATCH version when you make backwards-compatible bug fixes. - -Additional labels for pre-release and build metadata are available as extensions to the -MAJOR.MINOR.PATCH format. - -Following this naming strategy, migration from v1.x to v2.x requires a migration strategy for your -existing dataset, and as such has to be carefully planned. Migrations in between different minor -versions (e.g. v1.5.x and v1.6.x) might break your build, as the API *might* have changed, but once -your code compiles there's no need for any data migration. Lastly, changes in between two different -patch versions should never break your build or dataset. - -For more background on our decision to adopt Serialization Versioning, read the blog post -[Semantic Versioning, Go Modules, and Databases][blog] and the original proposal on -[this comment on Dgraph's Discuss forum][discuss]. - -[blog]: https://blog.dgraph.io/post/serialization-versioning/ -[discuss]: https://discuss.dgraph.io/t/go-modules-on-badger-and-dgraph/4662/7 \ No newline at end of file diff --git a/vendor/github.com/dgraph-io/badger/backup.go b/vendor/github.com/dgraph-io/badger/backup.go index 2569b310050..e270a85b563 100644 --- a/vendor/github.com/dgraph-io/badger/backup.go +++ b/vendor/github.com/dgraph-io/badger/backup.go @@ -69,10 +69,28 @@ func (stream *Stream) Backup(w io.Writer, since uint64) (uint64, error) { } } + // Convert key and/or values to a different format if requested before writing + // the key-value pair. + keyCopy := item.KeyCopy(nil) + if stream.db.opt.BackupKeyFn != nil { + var err error + keyCopy, err = stream.db.opt.BackupKeyFn(keyCopy) + if err != nil { + return nil, err + } + } + if stream.db.opt.BackupValueFn != nil { + var err error + valCopy, err = stream.db.opt.BackupValueFn(valCopy) + if err != nil { + return nil, err + } + } + // clear txn bits meta := item.meta &^ (bitTxn | bitFinTxn) kv := &pb.KV{ - Key: item.KeyCopy(nil), + Key: keyCopy, Value: valCopy, UserMeta: []byte{item.UserMeta()}, Version: item.Version(), @@ -86,7 +104,7 @@ func (stream *Stream) Backup(w io.Writer, since uint64) (uint64, error) { // If we need to discard earlier versions of this item, add a delete // marker just below the current version. list.Kv = append(list.Kv, &pb.KV{ - Key: item.KeyCopy(nil), + Key: keyCopy, Version: item.Version() - 1, Meta: []byte{bitDelete}, }) @@ -127,23 +145,20 @@ func writeTo(list *pb.KVList, w io.Writer) error { return err } -// KVLoader is used to write KVList objects in to badger. It can be used to restore a backup. -type KVLoader struct { +type loader struct { db *DB throttle *y.Throttle entries []*Entry } -// NewKVLoader returns a new instance of KVLoader. -func (db *DB) NewKVLoader(maxPendingWrites int) *KVLoader { - return &KVLoader{ +func (db *DB) newLoader(maxPendingWrites int) *loader { + return &loader{ db: db, throttle: y.NewThrottle(maxPendingWrites), } } -// Set writes the key-value pair to the database. -func (l *KVLoader) Set(kv *pb.KV) error { +func (l *loader) set(kv *pb.KV) error { var userMeta, meta byte if len(kv.UserMeta) > 0 { userMeta = kv.UserMeta[0] @@ -165,7 +180,7 @@ func (l *KVLoader) Set(kv *pb.KV) error { return nil } -func (l *KVLoader) send() error { +func (l *loader) send() error { if err := l.throttle.Do(); err != nil { return err } @@ -179,8 +194,7 @@ func (l *KVLoader) send() error { return nil } -// Finish is meant to be called after all the key-value pairs have been loaded. -func (l *KVLoader) Finish() error { +func (l *loader) finish() error { if len(l.entries) > 0 { if err := l.send(); err != nil { return err @@ -191,8 +205,7 @@ func (l *KVLoader) Finish() error { // Load reads a protobuf-encoded list of all entries from a reader and writes // them to the database. This can be used to restore the database from a backup -// made by calling DB.Backup(). If more complex logic is needed to restore a badger -// backup, the KVLoader interface should be used instead. +// made by calling DB.Backup(). // // DB.Load() should be called on a database that is not running any other // concurrent transactions while it is running. @@ -200,7 +213,7 @@ func (db *DB) Load(r io.Reader, maxPendingWrites int) error { br := bufio.NewReaderSize(r, 16<<10) unmarshalBuf := make([]byte, 1<<10) - ldr := db.NewKVLoader(maxPendingWrites) + ldr := db.newLoader(maxPendingWrites) for { var sz uint64 err := binary.Read(br, binary.LittleEndian, &sz) @@ -224,7 +237,24 @@ func (db *DB) Load(r io.Reader, maxPendingWrites int) error { } for _, kv := range list.Kv { - if err := ldr.Set(kv); err != nil { + // If the keys or values were transformed before backup, reverse those + // changes before restoring the key-value pair. + if db.opt.RestoreKeyFn != nil { + var err error + kv.Key, err = db.opt.RestoreKeyFn(kv.Key) + if err != nil { + return err + } + } + if db.opt.RestoreValueFn != nil { + var err error + kv.Value, err = db.opt.RestoreValueFn(kv.Value) + if err != nil { + return err + } + } + + if err := ldr.set(kv); err != nil { return err } @@ -236,7 +266,7 @@ func (db *DB) Load(r io.Reader, maxPendingWrites int) error { } } - if err := ldr.Finish(); err != nil { + if err := ldr.finish(); err != nil { return err } db.orc.txnMark.Done(db.orc.nextTxnTs - 1) diff --git a/vendor/github.com/dgraph-io/badger/compaction.go b/vendor/github.com/dgraph-io/badger/compaction.go index 375e40be2c3..931d56664c0 100644 --- a/vendor/github.com/dgraph-io/badger/compaction.go +++ b/vendor/github.com/dgraph-io/badger/compaction.go @@ -64,7 +64,7 @@ func (r keyRange) overlapsWith(dst keyRange) bool { return true } -func getKeyRange(tables ...*table.Table) keyRange { +func getKeyRange(tables []*table.Table) keyRange { if len(tables) == 0 { return keyRange{} } @@ -78,9 +78,6 @@ func getKeyRange(tables ...*table.Table) keyRange { biggest = tables[i].Biggest() } } - - // We pick all the versions of the smallest and the biggest key. Note that version zero would - // be the rightmost key, considering versions are default sorted in descending order. return keyRange{ left: y.KeyWithTs(y.ParseKey(smallest), math.MaxUint64), right: y.KeyWithTs(y.ParseKey(biggest), 0), diff --git a/vendor/github.com/dgraph-io/badger/db.go b/vendor/github.com/dgraph-io/badger/db.go index 8755986283c..21bb22d6f08 100644 --- a/vendor/github.com/dgraph-io/badger/db.go +++ b/vendor/github.com/dgraph-io/badger/db.go @@ -20,6 +20,7 @@ import ( "bytes" "context" "encoding/binary" + "encoding/hex" "expvar" "io" "math" @@ -58,6 +59,8 @@ type closers struct { pub *y.Closer } +type callback func(kv *pb.KVList) + // DB provides the various functions required to interact with Badger. // DB is thread-safe. type DB struct { @@ -189,17 +192,8 @@ func Open(opt Options) (db *DB, err error) { opt.maxBatchSize = (15 * opt.MaxTableSize) / 100 opt.maxBatchCount = opt.maxBatchSize / int64(skl.MaxNodeSize) - // We are limiting opt.ValueThreshold to maxValueThreshold for now. - if opt.ValueThreshold > maxValueThreshold { - return nil, errors.Errorf("Invalid ValueThreshold, must be less or equal to %d", - maxValueThreshold) - } - - // If ValueThreshold is greater than opt.maxBatchSize, we won't be able to push any data using - // the transaction APIs. Transaction batches entries into batches of size opt.maxBatchSize. - if int64(opt.ValueThreshold) > opt.maxBatchSize { - return nil, errors.Errorf("Valuethreshold greater than max batch size of %d. Either "+ - "reduce opt.ValueThreshold or increase opt.MaxTableSize.", opt.maxBatchSize) + if opt.ValueThreshold > ValueThresholdLimit { + return nil, ErrValueThreshold } if opt.ReadOnly { @@ -374,9 +368,6 @@ func (db *DB) close() (err error) { // Stop writes next. db.closers.writes.SignalAndWait() - // Don't accept any more write. - close(db.writeCh) - db.closers.pub.SignalAndWait() // Now close the value log. @@ -415,7 +406,6 @@ func (db *DB) close() (err error) { time.Sleep(10 * time.Millisecond) } } - db.stopMemoryFlush() db.stopCompactions() // Force Compact L0 @@ -469,12 +459,6 @@ func (db *DB) close() (err error) { return err } -// VerifyChecksum verifies checksum for all tables on all levels. -// This method can be used to verify checksum, if opt.ChecksumVerificationMode is NoVerification. -func (db *DB) VerifyChecksum() error { - return db.lc.verifyChecksum() -} - const ( lockFile = "LOCK" ) @@ -744,19 +728,15 @@ func (db *DB) doWrites(lc *y.Closer) { } closedCase: - // All the pending request are drained. - // Don't close the writeCh, because it has be used in several places. - for { - select { - case r = <-db.writeCh: - reqs = append(reqs, r) - default: - pendingCh <- struct{}{} // Push to pending before doing a write. - writeRequests(reqs) - return - } + close(db.writeCh) + for r := range db.writeCh { // Flush the channel. + reqs = append(reqs, r) } + pendingCh <- struct{}{} // Push to pending before doing a write. + writeRequests(reqs) + return + writeCase: go writeRequests(reqs) reqs = make([]*request, 0, 10) @@ -913,7 +893,7 @@ func (db *DB) handleFlushTask(ft flushTask) error { db.elog.Errorf("ERROR while syncing level directory: %v", dirSyncErr) } - tbl, err := table.OpenTable(fd, db.opt.TableLoadingMode, db.opt.ChecksumVerificationMode) + tbl, err := table.OpenTable(fd, db.opt.TableLoadingMode, nil) if err != nil { db.elog.Printf("ERROR while opening table: %v", err) return err @@ -1217,15 +1197,12 @@ func (db *DB) MaxBatchSize() int64 { return db.opt.maxBatchSize } -func (db *DB) stopMemoryFlush() { +func (db *DB) stopCompactions() { // Stop memtable flushes. if db.closers.memtable != nil { close(db.flushChan) db.closers.memtable.SignalAndWait() } -} - -func (db *DB) stopCompactions() { // Stop compactions. if db.closers.compactors != nil { db.closers.compactors.SignalAndWait() @@ -1238,10 +1215,6 @@ func (db *DB) startCompactions() { db.closers.compactors = y.NewCloser(1) db.lc.startCompact(db.closers.compactors) } -} - -func (db *DB) startMemoryFlush() { - // Start memory fluhser. if db.closers.memtable != nil { db.flushChan = make(chan flushTask, db.opt.NumMemtables) db.closers.memtable = y.NewCloser(1) @@ -1322,47 +1295,29 @@ func (db *DB) Flatten(workers int) error { } } -func (db *DB) blockWrite() { +func (db *DB) prepareToDrop() func() { + if db.opt.ReadOnly { + panic("Attempting to drop data in read-only mode.") + } // Stop accepting new writes. atomic.StoreInt32(&db.blockWrites, 1) // Make all pending writes finish. The following will also close writeCh. db.closers.writes.SignalAndWait() db.opt.Infof("Writes flushed. Stopping compactions now...") -} -func (db *DB) unblockWrite() { - db.closers.writes = y.NewCloser(1) - go db.doWrites(db.closers.writes) + // Stop all compactions. + db.stopCompactions() + return func() { + db.opt.Infof("Resuming writes") + db.startCompactions() - // Resume writes. - atomic.StoreInt32(&db.blockWrites, 0) -} + db.writeCh = make(chan *request, kvWriteChCapacity) + db.closers.writes = y.NewCloser(1) + go db.doWrites(db.closers.writes) -func (db *DB) prepareToDrop() func() { - if db.opt.ReadOnly { - panic("Attempting to drop data in read-only mode.") - } - // In order prepare for drop, we need to block the incoming writes and - // write it to db. Then, flush all the pending flushtask. So that, we - // don't miss any entries. - db.blockWrite() - reqs := make([]*request, 0, 10) - for { - select { - case r := <-db.writeCh: - reqs = append(reqs, r) - default: - if err := db.writeRequests(reqs); err != nil { - db.opt.Errorf("writeRequests: %v", err) - } - db.stopMemoryFlush() - return func() { - db.opt.Infof("Resuming writes") - db.startMemoryFlush() - db.unblockWrite() - } - } + // Resume writes. + atomic.StoreInt32(&db.blockWrites, 0) } } @@ -1379,24 +1334,20 @@ func (db *DB) prepareToDrop() func() { // writes are paused before running DropAll, and resumed after it is finished. func (db *DB) DropAll() error { f, err := db.dropAll() - defer f() if err != nil { return err } + if f == nil { + panic("both error and returned function cannot be nil in DropAll") + } + f() return nil } func (db *DB) dropAll() (func(), error) { db.opt.Infof("DropAll called. Blocking writes...") f := db.prepareToDrop() - // prepareToDrop will stop all the incomming write and flushes any pending flush tasks. - // Before we drop, we'll stop the compaction because anyways all the datas are going to - // be deleted. - db.stopCompactions() - resume := func() { - db.startCompactions() - f() - } + // Block all foreign interactions with memory tables. db.Lock() defer db.Unlock() @@ -1411,34 +1362,34 @@ func (db *DB) dropAll() (func(), error) { num, err := db.lc.dropTree() if err != nil { - return resume, err + return nil, err } db.opt.Infof("Deleted %d SSTables. Now deleting value logs...\n", num) num, err = db.vlog.dropAll() if err != nil { - return resume, err + return nil, err } db.vhead = valuePointer{} // Zero it out. db.lc.nextFileID = 1 db.opt.Infof("Deleted %d value log files. DropAll done.\n", num) - return resume, nil + return f, nil } // DropPrefix would drop all the keys with the provided prefix. It does this in the following way: // - Stop accepting new writes. -// - Stop memtable flushes before acquiring lock. Because we're acquring lock here -// and memtable flush stalls for lock, which leads to deadlock +// - Stop memtable flushes and compactions. // - Flush out all memtables, skipping over keys with the given prefix, Kp. // - Write out the value log header to memtables when flushing, so we don't accidentally bring Kp // back after a restart. -// - Stop compaction. // - Compact L0->L1, skipping over Kp. // - Compact rest of the levels, Li->Li, picking tables which have Kp. // - Resume memtable flushes, compactions and writes. func (db *DB) DropPrefix(prefix []byte) error { + db.opt.Infof("DropPrefix called on %s. Blocking writes...", hex.Dump(prefix)) f := db.prepareToDrop() defer f() + // Block all foreign interactions with memory tables. db.Lock() defer db.Unlock() @@ -1462,8 +1413,6 @@ func (db *DB) DropPrefix(prefix []byte) error { } memtable.DecrRef() } - db.stopCompactions() - defer db.startCompactions() db.imm = db.imm[:0] db.mt = skl.NewSkiplist(arenaSize(db.opt)) @@ -1475,22 +1424,12 @@ func (db *DB) DropPrefix(prefix []byte) error { return nil } -// KVList contains a list of key-value pairs. -type KVList = pb.KVList - -// Subscribe can be used to watch key changes for the given key prefixes. -// At least one prefix should be passed, or an error will be returned. -// You can use an empty prefix to monitor all changes to the DB. -// This function blocks until the given context is done or an error occurs. -// The given function will be called with a new KVList containing the modified keys and the -// corresponding values. -func (db *DB) Subscribe(ctx context.Context, cb func(kv *KVList), prefixes ...[]byte) error { +// Subscribe can be used watch key changes for the given key prefix. +func (db *DB) Subscribe(ctx context.Context, cb callback, prefix []byte, prefixes ...[]byte) error { if cb == nil { return ErrNilCallback } - if len(prefixes) == 0 { - return ErrNoPrefixes - } + prefixes = append(prefixes, prefix) c := y.NewCloser(1) recvCh, id := db.pub.newSubscriber(c, prefixes...) slurp := func(batch *pb.KVList) { diff --git a/vendor/github.com/dgraph-io/badger/errors.go b/vendor/github.com/dgraph-io/badger/errors.go index 347d3ff5265..8d2df6833a4 100644 --- a/vendor/github.com/dgraph-io/badger/errors.go +++ b/vendor/github.com/dgraph-io/badger/errors.go @@ -32,6 +32,11 @@ var ( // range. ErrValueLogSize = errors.New("Invalid ValueLogFileSize, must be between 1MB and 2GB") + // ErrValueThreshold is returned when ValueThreshold is set to a value close to or greater than + // uint16. + ErrValueThreshold = errors.Errorf( + "Invalid ValueThreshold, must be less than %d", ValueThresholdLimit) + // ErrKeyNotFound is returned when key isn't found on a txn.Get. ErrKeyNotFound = errors.New("Key not found") @@ -109,7 +114,4 @@ var ( // ErrNilCallback is returned when subscriber's callback is nil. ErrNilCallback = errors.New("Callback cannot be nil") - - // ErrNoPrefixes is returned when subscriber doesn't provide any prefix. - ErrNoPrefixes = errors.New("At least one key prefix is required") ) diff --git a/vendor/github.com/dgraph-io/badger/go.sum b/vendor/github.com/dgraph-io/badger/go.sum index 7e32ad1714e..3ae08e7741a 100644 --- a/vendor/github.com/dgraph-io/badger/go.sum +++ b/vendor/github.com/dgraph-io/badger/go.sum @@ -17,7 +17,6 @@ github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMo github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= -github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= diff --git a/vendor/github.com/dgraph-io/badger/levels.go b/vendor/github.com/dgraph-io/badger/levels.go index a4f1f7500e4..a4efd6624fe 100644 --- a/vendor/github.com/dgraph-io/badger/levels.go +++ b/vendor/github.com/dgraph-io/badger/levels.go @@ -19,6 +19,7 @@ package badger import ( "bytes" "fmt" + "math" "math/rand" "os" "sort" @@ -150,7 +151,7 @@ func newLevelsController(db *DB, mf *Manifest) (*levelsController, error) { return } - t, err := table.OpenTable(fd, db.opt.TableLoadingMode, db.opt.ChecksumVerificationMode) + t, err := table.OpenTable(fd, db.opt.TableLoadingMode, tf.Checksum) if err != nil { if strings.HasPrefix(err.Error(), "CHECKSUM_MISMATCH:") { db.opt.Errorf(err.Error()) @@ -423,7 +424,7 @@ func (s *levelsController) compactBuildTables( var hasOverlap bool { - kr := getKeyRange(cd.top...) + kr := getKeyRange(cd.top) for i, lh := range s.levels { if i <= lev { // Skip upper levels. continue @@ -562,31 +563,28 @@ func (s *levelsController) compactBuildTables( // called Add() at least once, and builder is not Empty(). s.kv.opt.Debugf("LOG Compact. Added %d keys. Skipped %d keys. Iteration took: %v", numKeys, numSkips, time.Since(timeStart)) - build := func(fileID uint64) (*table.Table, error) { - fd, err := y.CreateSyncedFile(table.NewFilename(fileID, s.kv.opt.Dir), true) - if err != nil { - return nil, errors.Wrapf(err, "While opening new table: %d", fileID) - } + if !builder.Empty() { + numBuilds++ + fileID := s.reserveFileID() + go func(builder *table.Builder) { + defer builder.Close() + + fd, err := y.CreateSyncedFile(table.NewFilename(fileID, s.kv.opt.Dir), true) + if err != nil { + resultCh <- newTableResult{nil, errors.Wrapf(err, "While opening new table: %d", fileID)} + return + } - if _, err := fd.Write(builder.Finish()); err != nil { - return nil, errors.Wrapf(err, "Unable to write to file: %d", fileID) - } + if _, err := fd.Write(builder.Finish()); err != nil { + resultCh <- newTableResult{nil, errors.Wrapf(err, "Unable to write to file: %d", fileID)} + return + } - tbl, err := table.OpenTable(fd, s.kv.opt.TableLoadingMode, - s.kv.opt.ChecksumVerificationMode) - // decrRef is added below. - return tbl, errors.Wrapf(err, "Unable to open table: %q", fd.Name()) - } - if builder.Empty() { - continue + tbl, err := table.OpenTable(fd, s.kv.opt.TableLoadingMode, nil) + // decrRef is added below. + resultCh <- newTableResult{tbl, errors.Wrapf(err, "Unable to open table: %q", fd.Name())} + }(builder) } - numBuilds++ - fileID := s.reserveFileID() - go func(builder *table.Builder) { - defer builder.Close() - tbl, err := build(fileID) - resultCh <- newTableResult{tbl, err} - }(builder) } newTables := make([]*table.Table, 0, 20) @@ -633,7 +631,7 @@ func buildChangeSet(cd *compactDef, newTables []*table.Table) pb.ManifestChangeS changes := []*pb.ManifestChange{} for _, table := range newTables { changes = append(changes, - newCreateChange(table.ID(), cd.nextLevel.level)) + newCreateChange(table.ID(), cd.nextLevel.level, table.Checksum)) } for _, table := range cd.top { changes = append(changes, newDeleteChange(table.ID())) @@ -682,7 +680,7 @@ func (s *levelsController) fillTablesL0(cd *compactDef) bool { } cd.thisRange = infRange - kr := getKeyRange(cd.top...) + kr := getKeyRange(cd.top) left, right := cd.nextLevel.overlappingTables(levelHandlerRLocked{}, kr) cd.bot = make([]*table.Table, right-left) copy(cd.bot, cd.nextLevel.tables[left:right]) @@ -690,7 +688,7 @@ func (s *levelsController) fillTablesL0(cd *compactDef) bool { if len(cd.bot) == 0 { cd.nextRange = kr } else { - cd.nextRange = getKeyRange(cd.bot...) + cd.nextRange = getKeyRange(cd.bot) } if !s.cstatus.compareAndAdd(thisAndNextLevelRLocked{}, *cd) { @@ -700,44 +698,30 @@ func (s *levelsController) fillTablesL0(cd *compactDef) bool { return true } -// sortByOverlap sorts tables in increasing order of overlap with next level. -func (s *levelsController) sortByOverlap(tables []*table.Table, cd *compactDef) { - if len(tables) == 0 || cd.nextLevel == nil { - return - } - - tableOverlap := make([]int, len(tables)) - for i := range tables { - // get key range for table - tableRange := getKeyRange(tables[i]) - // get overlap with next level - left, right := cd.nextLevel.overlappingTables(levelHandlerRLocked{}, tableRange) - tableOverlap[i] = right - left - } - - sort.Slice(tables, func(i, j int) bool { - return tableOverlap[i] < tableOverlap[j] - }) -} - func (s *levelsController) fillTables(cd *compactDef) bool { cd.lockLevels() defer cd.unlockLevels() - tables := make([]*table.Table, len(cd.thisLevel.tables)) - copy(tables, cd.thisLevel.tables) - if len(tables) == 0 { + tbls := make([]*table.Table, len(cd.thisLevel.tables)) + copy(tbls, cd.thisLevel.tables) + if len(tbls) == 0 { return false } - // We want to pick files from current level in order of increasing overlap with next level - // tables. Idea here is to first compact file from current level which has least overlap with - // next level. This provides us better write amplification. - s.sortByOverlap(tables, cd) + // Find the biggest table, and compact that first. + // TODO: Try other table picking strategies. + sort.Slice(tbls, func(i, j int) bool { + return tbls[i].Size() > tbls[j].Size() + }) - for _, t := range tables { + for _, t := range tbls { cd.thisSize = t.Size() - cd.thisRange = getKeyRange(t) + cd.thisRange = keyRange{ + // We pick all the versions of the smallest and the biggest key. + left: y.KeyWithTs(y.ParseKey(t.Smallest()), math.MaxUint64), + // Note that version zero would be the rightmost key. + right: y.KeyWithTs(y.ParseKey(t.Biggest()), 0), + } if s.cstatus.overlapsWith(cd.thisLevel.level, cd.thisRange) { continue } @@ -755,7 +739,7 @@ func (s *levelsController) fillTables(cd *compactDef) bool { } return true } - cd.nextRange = getKeyRange(cd.bot...) + cd.nextRange = getKeyRange(cd.bot) if s.cstatus.overlapsWith(cd.nextLevel.level, cd.nextRange) { continue @@ -863,7 +847,7 @@ func (s *levelsController) addLevel0Table(t *table.Table) error { // the proper order. (That means this update happens before that of some compaction which // deletes the table.) err := s.kv.manifest.addChanges([]*pb.ManifestChange{ - newCreateChange(t.ID(), 0), + newCreateChange(t.ID(), 0, t.Checksum), }) if err != nil { return err @@ -1003,31 +987,3 @@ func (s *levelsController) getTableInfo(withKeysCount bool) (result []TableInfo) }) return } - -// verifyChecksum verifies checksum for all tables on all levels. -func (s *levelsController) verifyChecksum() error { - var tables []*table.Table - for _, l := range s.levels { - l.RLock() - tables = tables[:0] - for _, t := range l.tables { - tables = append(tables, t) - t.IncrRef() - } - l.RUnlock() - - for _, t := range tables { - errChkVerify := t.VerifyChecksum() - if err := t.DecrRef(); err != nil { - s.kv.opt.Errorf("unable to decrease reference of table: %s while "+ - "verifying checksum with error: %s", t.Filename(), err) - } - - if errChkVerify != nil { - return errChkVerify - } - } - } - - return nil -} diff --git a/vendor/github.com/dgraph-io/badger/manifest.go b/vendor/github.com/dgraph-io/badger/manifest.go index 9786b45af97..a5818829473 100644 --- a/vendor/github.com/dgraph-io/badger/manifest.go +++ b/vendor/github.com/dgraph-io/badger/manifest.go @@ -99,7 +99,7 @@ const ( func (m *Manifest) asChanges() []*pb.ManifestChange { changes := make([]*pb.ManifestChange, 0, len(m.Tables)) for id, tm := range m.Tables { - changes = append(changes, newCreateChange(id, int(tm.Level))) + changes = append(changes, newCreateChange(id, int(tm.Level), tm.Checksum)) } return changes } @@ -223,7 +223,7 @@ func (mf *manifestFile) addChanges(changesParam []*pb.ManifestChange) error { var magicText = [4]byte{'B', 'd', 'g', 'r'} // The magic version number. -const magicVersion = 5 +const magicVersion = 4 func helpRewrite(dir string, m *Manifest) (*os.File, int, error) { rewritePath := filepath.Join(dir, manifestRewriteFilename) @@ -390,7 +390,8 @@ func applyManifestChange(build *Manifest, tc *pb.ManifestChange) error { return fmt.Errorf("MANIFEST invalid, table %d exists", tc.Id) } build.Tables[tc.Id] = TableManifest{ - Level: uint8(tc.Level), + Level: uint8(tc.Level), + Checksum: append([]byte{}, tc.Checksum...), } for len(build.Levels) <= int(tc.Level) { build.Levels = append(build.Levels, levelManifest{make(map[uint64]struct{})}) @@ -422,11 +423,12 @@ func applyChangeSet(build *Manifest, changeSet *pb.ManifestChangeSet) error { return nil } -func newCreateChange(id uint64, level int) *pb.ManifestChange { +func newCreateChange(id uint64, level int, checksum []byte) *pb.ManifestChange { return &pb.ManifestChange{ - Id: id, - Op: pb.ManifestChange_CREATE, - Level: uint32(level), + Id: id, + Op: pb.ManifestChange_CREATE, + Level: uint32(level), + Checksum: checksum, } } diff --git a/vendor/github.com/dgraph-io/badger/options.go b/vendor/github.com/dgraph-io/badger/options.go index 1b22c1792c2..2d01522de0a 100644 --- a/vendor/github.com/dgraph-io/badger/options.go +++ b/vendor/github.com/dgraph-io/badger/options.go @@ -64,8 +64,10 @@ type Options struct { CompactL0OnClose bool LogRotatesToFlush int32 - // ChecksumVerificationMode decides when db should verify checksum for SStable blocks. - ChecksumVerificationMode options.ChecksumVerificationMode + BackupKeyFn func([]byte) ([]byte, error) + BackupValueFn func([]byte) ([]byte, error) + RestoreKeyFn func([]byte) ([]byte, error) + RestoreValueFn func([]byte) ([]byte, error) // Transaction start and commit timestamps are managed by end-user. // This is only useful for databases built on top of Badger (like Dgraph). @@ -114,15 +116,12 @@ func DefaultOptions(path string) Options { } } -const ( - maxValueThreshold = (1 << 20) // 1 MB -) - // LSMOnlyOptions follows from DefaultOptions, but sets a higher ValueThreshold -// so values would be collocated with the LSM tree, with value log largely acting +// so values would be colocated with the LSM tree, with value log largely acting // as a write-ahead log only. These options would reduce the disk usage of value // log, and make Badger act more like a typical LSM tree. func LSMOnlyOptions(path string) Options { + // Max value length which fits in uint16. // Let's not set any other options, because they can cause issues with the // size of key-value a user can pass to Badger. For e.g., if we set // ValueLogFileSize to 64MB, a user can't pass a value more than that. @@ -132,8 +131,8 @@ func LSMOnlyOptions(path string) Options { // achieve a heavier usage of LSM tree. // NOTE: If a user does not want to set 64KB as the ValueThreshold because // of performance reasons, 1KB would be a good option too, allowing - // values smaller than 1KB to be collocated with the keys in the LSM tree. - return DefaultOptions(path).WithValueThreshold(maxValueThreshold /* 1 MB */) + // values smaller than 1KB to be colocated with the keys in the LSM tree. + return DefaultOptions(path).WithValueThreshold(65500) } // WithDir returns a new Options value with Dir set to the given value. @@ -269,9 +268,9 @@ func (opt Options) WithMaxLevels(val int) Options { // WithValueThreshold returns a new Options value with ValueThreshold set to the given value. // // ValueThreshold sets the threshold used to decide whether a value is stored directly in the LSM -// tree or separately in the log value files. +// tree or separatedly in the log value files. // -// The default value of ValueThreshold is 32, but LSMOnlyOptions sets it to maxValueThreshold. +// The default value of ValueThreshold is 32, but LSMOnlyOptions sets it to 65500. func (opt Options) WithValueThreshold(val int) Options { opt.ValueThreshold = val return opt @@ -378,3 +377,39 @@ func (opt Options) WithLogRotatesToFlush(val int32) Options { opt.LogRotatesToFlush = val return opt } + +// WithBackupKeyFn returns a new Options value with BackupKeyFn set to the given value. +// +// BackupKeyFn is used during backup when the user wishes to store the keys in some other format. +func (opt Options) WithBackupKeyFn(fn func([]byte) ([]byte, error)) Options { + opt.BackupKeyFn = fn + return opt +} + +// WithBackupValueFn returns a new Options value with BackupValueFn set to the given value. +// +// BackupValueFn is used during backup when the user wishes to store the values in some other +// format. +func (opt Options) WithBackupValueFn(fn func([]byte) ([]byte, error)) Options { + opt.BackupValueFn = fn + return opt +} + +// WithRestoreKeyFn returns a new Options value with RestoreKeyFn set to the given value. +// +// RestoreKeyFn is used while loading a backup when the user wishes to restore keys that were +// backed up in a different format. It essentially should revert the changes made by BackupKeyFn. +func (opt Options) WithRestoreKeyFn(fn func([]byte) ([]byte, error)) Options { + opt.RestoreKeyFn = fn + return opt +} + +// WithRestoreValueFn returns a new Options value with RestoreValueFn set to the given value. +// +// RestoreValueFn is used while loading a backup when the user wishes to restore values that were +// backed up in a different format. It essentially should revert the changes made by +// BackupValueFn. +func (opt Options) WithRestoreValueFn(fn func([]byte) ([]byte, error)) Options { + opt.RestoreValueFn = fn + return opt +} diff --git a/vendor/github.com/dgraph-io/badger/options/options.go b/vendor/github.com/dgraph-io/badger/options/options.go index f73553ab522..06c8b1b7f0e 100644 --- a/vendor/github.com/dgraph-io/badger/options/options.go +++ b/vendor/github.com/dgraph-io/badger/options/options.go @@ -28,18 +28,3 @@ const ( // MemoryMap indicates that that the file must be memory-mapped MemoryMap ) - -// ChecksumVerificationMode tells when should DB verify checksum for SSTable blocks. -type ChecksumVerificationMode int - -const ( - // NoVerification indicates DB should not verify checksum for SSTable blocks. - NoVerification ChecksumVerificationMode = iota - // OnTableRead indicates checksum should be verified while opening SSTtable. - OnTableRead - // OnBlockRead indicates checksum should be verified on every SSTable block read. - OnBlockRead - // OnTableAndBlockRead indicates checksum should be verified - // on SSTable opening and on every block read. - OnTableAndBlockRead -) diff --git a/vendor/github.com/dgraph-io/badger/pb/pb.pb.go b/vendor/github.com/dgraph-io/badger/pb/pb.pb.go index c75ab72f82f..f9a2c6eeed4 100644 --- a/vendor/github.com/dgraph-io/badger/pb/pb.pb.go +++ b/vendor/github.com/dgraph-io/badger/pb/pb.pb.go @@ -46,31 +46,6 @@ func (ManifestChange_Operation) EnumDescriptor() ([]byte, []int) { return fileDescriptor_f80abaa17e25ccc8, []int{3, 0} } -type Checksum_Algorithm int32 - -const ( - Checksum_CRC32C Checksum_Algorithm = 0 - Checksum_XXHash64 Checksum_Algorithm = 1 -) - -var Checksum_Algorithm_name = map[int32]string{ - 0: "CRC32C", - 1: "XXHash64", -} - -var Checksum_Algorithm_value = map[string]int32{ - "CRC32C": 0, - "XXHash64": 1, -} - -func (x Checksum_Algorithm) String() string { - return proto.EnumName(Checksum_Algorithm_name, int32(x)) -} - -func (Checksum_Algorithm) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_f80abaa17e25ccc8, []int{6, 0} -} - type KV struct { Key []byte `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"` Value []byte `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` @@ -266,6 +241,7 @@ type ManifestChange struct { Id uint64 `protobuf:"varint,1,opt,name=Id,proto3" json:"Id,omitempty"` Op ManifestChange_Operation `protobuf:"varint,2,opt,name=Op,proto3,enum=pb.ManifestChange_Operation" json:"Op,omitempty"` Level uint32 `protobuf:"varint,3,opt,name=Level,proto3" json:"Level,omitempty"` + Checksum []byte `protobuf:"bytes,4,opt,name=Checksum,proto3" json:"Checksum,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -325,226 +301,48 @@ func (m *ManifestChange) GetLevel() uint32 { return 0 } -type BlockOffset struct { - Key []byte `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"` - Offset uint32 `protobuf:"varint,2,opt,name=offset,proto3" json:"offset,omitempty"` - Len uint32 `protobuf:"varint,3,opt,name=len,proto3" json:"len,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} - -func (m *BlockOffset) Reset() { *m = BlockOffset{} } -func (m *BlockOffset) String() string { return proto.CompactTextString(m) } -func (*BlockOffset) ProtoMessage() {} -func (*BlockOffset) Descriptor() ([]byte, []int) { - return fileDescriptor_f80abaa17e25ccc8, []int{4} -} -func (m *BlockOffset) XXX_Unmarshal(b []byte) error { - return m.Unmarshal(b) -} -func (m *BlockOffset) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - if deterministic { - return xxx_messageInfo_BlockOffset.Marshal(b, m, deterministic) - } else { - b = b[:cap(b)] - n, err := m.MarshalTo(b) - if err != nil { - return nil, err - } - return b[:n], nil - } -} -func (m *BlockOffset) XXX_Merge(src proto.Message) { - xxx_messageInfo_BlockOffset.Merge(m, src) -} -func (m *BlockOffset) XXX_Size() int { - return m.Size() -} -func (m *BlockOffset) XXX_DiscardUnknown() { - xxx_messageInfo_BlockOffset.DiscardUnknown(m) -} - -var xxx_messageInfo_BlockOffset proto.InternalMessageInfo - -func (m *BlockOffset) GetKey() []byte { - if m != nil { - return m.Key - } - return nil -} - -func (m *BlockOffset) GetOffset() uint32 { - if m != nil { - return m.Offset - } - return 0 -} - -func (m *BlockOffset) GetLen() uint32 { - if m != nil { - return m.Len - } - return 0 -} - -type TableIndex struct { - Offsets []*BlockOffset `protobuf:"bytes,1,rep,name=offsets,proto3" json:"offsets,omitempty"` - BloomFilter []byte `protobuf:"bytes,2,opt,name=bloom_filter,json=bloomFilter,proto3" json:"bloom_filter,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} - -func (m *TableIndex) Reset() { *m = TableIndex{} } -func (m *TableIndex) String() string { return proto.CompactTextString(m) } -func (*TableIndex) ProtoMessage() {} -func (*TableIndex) Descriptor() ([]byte, []int) { - return fileDescriptor_f80abaa17e25ccc8, []int{5} -} -func (m *TableIndex) XXX_Unmarshal(b []byte) error { - return m.Unmarshal(b) -} -func (m *TableIndex) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - if deterministic { - return xxx_messageInfo_TableIndex.Marshal(b, m, deterministic) - } else { - b = b[:cap(b)] - n, err := m.MarshalTo(b) - if err != nil { - return nil, err - } - return b[:n], nil - } -} -func (m *TableIndex) XXX_Merge(src proto.Message) { - xxx_messageInfo_TableIndex.Merge(m, src) -} -func (m *TableIndex) XXX_Size() int { - return m.Size() -} -func (m *TableIndex) XXX_DiscardUnknown() { - xxx_messageInfo_TableIndex.DiscardUnknown(m) -} - -var xxx_messageInfo_TableIndex proto.InternalMessageInfo - -func (m *TableIndex) GetOffsets() []*BlockOffset { - if m != nil { - return m.Offsets - } - return nil -} - -func (m *TableIndex) GetBloomFilter() []byte { +func (m *ManifestChange) GetChecksum() []byte { if m != nil { - return m.BloomFilter + return m.Checksum } return nil } -type Checksum struct { - Algo Checksum_Algorithm `protobuf:"varint,1,opt,name=algo,proto3,enum=pb.Checksum_Algorithm" json:"algo,omitempty"` - Sum uint64 `protobuf:"varint,2,opt,name=sum,proto3" json:"sum,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} - -func (m *Checksum) Reset() { *m = Checksum{} } -func (m *Checksum) String() string { return proto.CompactTextString(m) } -func (*Checksum) ProtoMessage() {} -func (*Checksum) Descriptor() ([]byte, []int) { - return fileDescriptor_f80abaa17e25ccc8, []int{6} -} -func (m *Checksum) XXX_Unmarshal(b []byte) error { - return m.Unmarshal(b) -} -func (m *Checksum) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - if deterministic { - return xxx_messageInfo_Checksum.Marshal(b, m, deterministic) - } else { - b = b[:cap(b)] - n, err := m.MarshalTo(b) - if err != nil { - return nil, err - } - return b[:n], nil - } -} -func (m *Checksum) XXX_Merge(src proto.Message) { - xxx_messageInfo_Checksum.Merge(m, src) -} -func (m *Checksum) XXX_Size() int { - return m.Size() -} -func (m *Checksum) XXX_DiscardUnknown() { - xxx_messageInfo_Checksum.DiscardUnknown(m) -} - -var xxx_messageInfo_Checksum proto.InternalMessageInfo - -func (m *Checksum) GetAlgo() Checksum_Algorithm { - if m != nil { - return m.Algo - } - return Checksum_CRC32C -} - -func (m *Checksum) GetSum() uint64 { - if m != nil { - return m.Sum - } - return 0 -} - func init() { proto.RegisterEnum("pb.ManifestChange_Operation", ManifestChange_Operation_name, ManifestChange_Operation_value) - proto.RegisterEnum("pb.Checksum_Algorithm", Checksum_Algorithm_name, Checksum_Algorithm_value) proto.RegisterType((*KV)(nil), "pb.KV") proto.RegisterType((*KVList)(nil), "pb.KVList") proto.RegisterType((*ManifestChangeSet)(nil), "pb.ManifestChangeSet") proto.RegisterType((*ManifestChange)(nil), "pb.ManifestChange") - proto.RegisterType((*BlockOffset)(nil), "pb.BlockOffset") - proto.RegisterType((*TableIndex)(nil), "pb.TableIndex") - proto.RegisterType((*Checksum)(nil), "pb.Checksum") } func init() { proto.RegisterFile("pb.proto", fileDescriptor_f80abaa17e25ccc8) } var fileDescriptor_f80abaa17e25ccc8 = []byte{ - // 495 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x52, 0xdd, 0x8e, 0xd2, 0x40, - 0x14, 0x66, 0x4a, 0xb7, 0xc0, 0xe1, 0xc7, 0x3a, 0x31, 0xa4, 0x89, 0x4a, 0xb0, 0xc6, 0x04, 0xcd, - 0x86, 0x0b, 0xd6, 0x78, 0xcf, 0x22, 0x46, 0x02, 0x1b, 0x92, 0x91, 0x90, 0x8d, 0x37, 0x64, 0x0a, - 0x07, 0x68, 0x68, 0x3b, 0x4d, 0x67, 0x20, 0xeb, 0x23, 0xf8, 0x06, 0xbe, 0x87, 0x2f, 0xe1, 0xa5, - 0x8f, 0x60, 0xf0, 0x45, 0xcc, 0x0c, 0x74, 0xe3, 0x46, 0xef, 0xce, 0xf7, 0x33, 0xdf, 0x9c, 0x7e, - 0x1d, 0x28, 0xa7, 0x41, 0x37, 0xcd, 0x84, 0x12, 0xd4, 0x4a, 0x03, 0xff, 0x3b, 0x01, 0x6b, 0x3c, - 0xa7, 0x2e, 0x14, 0x77, 0xf8, 0xc5, 0x23, 0x6d, 0xd2, 0xa9, 0x31, 0x3d, 0xd2, 0x27, 0x70, 0x71, - 0xe0, 0xd1, 0x1e, 0x3d, 0xcb, 0x70, 0x27, 0x40, 0x9f, 0x42, 0x65, 0x2f, 0x31, 0x5b, 0xc4, 0xa8, - 0xb8, 0x57, 0x34, 0x4a, 0x59, 0x13, 0x37, 0xa8, 0x38, 0xf5, 0xa0, 0x74, 0xc0, 0x4c, 0x86, 0x22, - 0xf1, 0xec, 0x36, 0xe9, 0xd8, 0x2c, 0x87, 0xf4, 0x39, 0x00, 0xde, 0xa5, 0x61, 0x86, 0x72, 0xc1, - 0x95, 0x77, 0x61, 0xc4, 0xca, 0x99, 0xe9, 0x2b, 0x4a, 0xc1, 0x36, 0x81, 0x8e, 0x09, 0x34, 0xb3, - 0xbe, 0x49, 0xaa, 0x0c, 0x79, 0xbc, 0x08, 0x57, 0x1e, 0xb4, 0x49, 0xa7, 0xce, 0xca, 0x27, 0x62, - 0xb4, 0xf2, 0xdb, 0xe0, 0x8c, 0xe7, 0x93, 0x50, 0x2a, 0xda, 0x04, 0x6b, 0x77, 0xf0, 0x48, 0xbb, - 0xd8, 0xa9, 0xf6, 0x9c, 0x6e, 0x1a, 0x74, 0xc7, 0x73, 0x66, 0xed, 0x0e, 0x7e, 0x1f, 0x1e, 0xdf, - 0xf0, 0x24, 0x5c, 0xa3, 0x54, 0x83, 0x2d, 0x4f, 0x36, 0xf8, 0x09, 0x15, 0xbd, 0x84, 0xd2, 0xd2, - 0x00, 0x79, 0x3e, 0x41, 0xf5, 0x89, 0x87, 0x3e, 0x96, 0x5b, 0xfc, 0xaf, 0x04, 0x1a, 0x0f, 0x35, - 0xda, 0x00, 0x6b, 0xb4, 0x32, 0x2d, 0xd9, 0xcc, 0x1a, 0xad, 0xe8, 0x25, 0x58, 0xd3, 0xd4, 0x34, - 0xd4, 0xe8, 0x3d, 0xfb, 0x37, 0xab, 0x3b, 0x4d, 0x31, 0xe3, 0x2a, 0x14, 0x09, 0xb3, 0xa6, 0xa9, - 0xae, 0x74, 0x82, 0x07, 0x8c, 0x4c, 0x71, 0x75, 0x76, 0x02, 0xfe, 0x4b, 0xa8, 0xdc, 0xdb, 0x28, - 0x80, 0x33, 0x60, 0xc3, 0xfe, 0x6c, 0xe8, 0x16, 0xf4, 0xfc, 0x7e, 0x38, 0x19, 0xce, 0x86, 0x2e, - 0xf1, 0x47, 0x50, 0xbd, 0x8e, 0xc4, 0x72, 0x37, 0x5d, 0xaf, 0x25, 0xaa, 0xff, 0xfc, 0xae, 0x26, - 0x38, 0xc2, 0x68, 0x66, 0x9b, 0x3a, 0x3b, 0x23, 0xed, 0x8c, 0x30, 0x39, 0xdf, 0xa8, 0x47, 0xff, - 0x33, 0xc0, 0x8c, 0x07, 0x11, 0x8e, 0x92, 0x15, 0xde, 0xd1, 0xd7, 0x50, 0x3a, 0x39, 0xf3, 0x4a, - 0x1e, 0xe9, 0xcf, 0xf8, 0xeb, 0x2e, 0x96, 0xeb, 0xf4, 0x05, 0xd4, 0x82, 0x48, 0x88, 0x78, 0xb1, - 0x0e, 0x23, 0x85, 0xd9, 0xf9, 0x61, 0x54, 0x0d, 0xf7, 0xc1, 0x50, 0xbe, 0x80, 0xf2, 0x60, 0x8b, - 0xcb, 0x9d, 0xdc, 0xc7, 0xf4, 0x0d, 0xd8, 0x3c, 0xda, 0x08, 0xb3, 0x64, 0xa3, 0xd7, 0xd4, 0xb1, - 0xb9, 0xd6, 0xed, 0x47, 0x1b, 0x91, 0x85, 0x6a, 0x1b, 0x33, 0xe3, 0xd1, 0x5b, 0xca, 0x7d, 0x6c, - 0x12, 0x6d, 0xa6, 0x47, 0xff, 0x15, 0x54, 0xee, 0x4d, 0xa7, 0x56, 0x06, 0x57, 0xbd, 0x81, 0x5b, - 0xa0, 0x35, 0x28, 0xdf, 0xde, 0x7e, 0xe4, 0x72, 0xfb, 0xee, 0xad, 0x4b, 0xae, 0xdd, 0x1f, 0xc7, - 0x16, 0xf9, 0x79, 0x6c, 0x91, 0x5f, 0xc7, 0x16, 0xf9, 0xf6, 0xbb, 0x55, 0x08, 0x1c, 0xf3, 0xb6, - 0xaf, 0xfe, 0x04, 0x00, 0x00, 0xff, 0xff, 0x83, 0x9a, 0x9b, 0x10, 0xe7, 0x02, 0x00, 0x00, + // 365 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x64, 0x91, 0x4f, 0x8a, 0xdb, 0x30, + 0x14, 0xc6, 0x47, 0x8a, 0xc7, 0xe3, 0xbc, 0xce, 0x04, 0x57, 0x94, 0x22, 0xfa, 0xc7, 0x18, 0x77, + 0xe3, 0xc5, 0xe0, 0xc5, 0xf4, 0x04, 0x69, 0xea, 0x45, 0x48, 0x42, 0x40, 0x0d, 0xd9, 0x06, 0x39, + 0x7e, 0x6d, 0x8c, 0x13, 0x5b, 0x58, 0x8a, 0x69, 0x6f, 0xd2, 0x0b, 0xf4, 0x04, 0xbd, 0x44, 0x97, + 0x3d, 0x42, 0x49, 0x2f, 0x52, 0xac, 0xfc, 0x81, 0xd0, 0xdd, 0xfb, 0xbe, 0xef, 0xbd, 0x4f, 0xf0, + 0x13, 0x78, 0x2a, 0x4b, 0x54, 0x53, 0x9b, 0x9a, 0x51, 0x95, 0x45, 0x3f, 0x09, 0xd0, 0xc9, 0x92, + 0xf9, 0xd0, 0x2b, 0xf1, 0x1b, 0x27, 0x21, 0x89, 0xef, 0x45, 0x37, 0xb2, 0x17, 0x70, 0xdb, 0xca, + 0xed, 0x1e, 0x39, 0xb5, 0xde, 0x51, 0xb0, 0xd7, 0xd0, 0xdf, 0x6b, 0x6c, 0x56, 0x3b, 0x34, 0x92, + 0xf7, 0x6c, 0xe2, 0x75, 0xc6, 0x0c, 0x8d, 0x64, 0x1c, 0xee, 0x5a, 0x6c, 0x74, 0x51, 0x57, 0xdc, + 0x09, 0x49, 0xec, 0x88, 0xb3, 0x64, 0x6f, 0x01, 0xf0, 0xab, 0x2a, 0x1a, 0xd4, 0x2b, 0x69, 0xf8, + 0xad, 0x0d, 0xfb, 0x27, 0x67, 0x68, 0x18, 0x03, 0xc7, 0x16, 0xba, 0xb6, 0xd0, 0xce, 0xdd, 0x4b, + 0xda, 0x34, 0x28, 0x77, 0xab, 0x22, 0xe7, 0x10, 0x92, 0xf8, 0x41, 0x78, 0x47, 0x63, 0x9c, 0x47, + 0x21, 0xb8, 0x93, 0xe5, 0xb4, 0xd0, 0x86, 0xbd, 0x04, 0x5a, 0xb6, 0x9c, 0x84, 0xbd, 0xf8, 0xd9, + 0x93, 0x9b, 0xa8, 0x2c, 0x99, 0x2c, 0x05, 0x2d, 0xdb, 0x68, 0x08, 0xcf, 0x67, 0xb2, 0x2a, 0x3e, + 0xa3, 0x36, 0xa3, 0x8d, 0xac, 0xbe, 0xe0, 0x27, 0x34, 0xec, 0x11, 0xee, 0xd6, 0x56, 0xe8, 0xd3, + 0x05, 0xeb, 0x2e, 0xae, 0xf7, 0xc4, 0x79, 0x25, 0xfa, 0x41, 0x60, 0x70, 0x9d, 0xb1, 0x01, 0xd0, + 0x71, 0x6e, 0x29, 0x39, 0x82, 0x8e, 0x73, 0xf6, 0x08, 0x74, 0xae, 0x2c, 0xa1, 0xc1, 0xd3, 0x9b, + 0xff, 0xbb, 0x92, 0xb9, 0xc2, 0x46, 0x9a, 0xa2, 0xae, 0x04, 0x9d, 0xab, 0x0e, 0xe9, 0x14, 0x5b, + 0xdc, 0x5a, 0x70, 0x0f, 0xe2, 0x28, 0xd8, 0x2b, 0xf0, 0x46, 0x1b, 0x5c, 0x97, 0x7a, 0xbf, 0xb3, + 0xd8, 0xee, 0xc5, 0x45, 0x47, 0xef, 0xa0, 0x7f, 0xa9, 0x60, 0x00, 0xee, 0x48, 0xa4, 0xc3, 0x45, + 0xea, 0xdf, 0x74, 0xf3, 0xc7, 0x74, 0x9a, 0x2e, 0x52, 0x9f, 0x7c, 0xf0, 0x7f, 0x1d, 0x02, 0xf2, + 0xfb, 0x10, 0x90, 0x3f, 0x87, 0x80, 0x7c, 0xff, 0x1b, 0xdc, 0x64, 0xae, 0xfd, 0xdf, 0xf7, 0xff, + 0x02, 0x00, 0x00, 0xff, 0xff, 0xeb, 0x28, 0x5d, 0xcf, 0xeb, 0x01, 0x00, 0x00, } func (m *KV) Marshal() (dAtA []byte, err error) { @@ -703,112 +501,11 @@ func (m *ManifestChange) MarshalTo(dAtA []byte) (int, error) { i++ i = encodeVarintPb(dAtA, i, uint64(m.Level)) } - if m.XXX_unrecognized != nil { - i += copy(dAtA[i:], m.XXX_unrecognized) - } - return i, nil -} - -func (m *BlockOffset) Marshal() (dAtA []byte, err error) { - size := m.Size() - dAtA = make([]byte, size) - n, err := m.MarshalTo(dAtA) - if err != nil { - return nil, err - } - return dAtA[:n], nil -} - -func (m *BlockOffset) MarshalTo(dAtA []byte) (int, error) { - var i int - _ = i - var l int - _ = l - if len(m.Key) > 0 { - dAtA[i] = 0xa - i++ - i = encodeVarintPb(dAtA, i, uint64(len(m.Key))) - i += copy(dAtA[i:], m.Key) - } - if m.Offset != 0 { - dAtA[i] = 0x10 - i++ - i = encodeVarintPb(dAtA, i, uint64(m.Offset)) - } - if m.Len != 0 { - dAtA[i] = 0x18 - i++ - i = encodeVarintPb(dAtA, i, uint64(m.Len)) - } - if m.XXX_unrecognized != nil { - i += copy(dAtA[i:], m.XXX_unrecognized) - } - return i, nil -} - -func (m *TableIndex) Marshal() (dAtA []byte, err error) { - size := m.Size() - dAtA = make([]byte, size) - n, err := m.MarshalTo(dAtA) - if err != nil { - return nil, err - } - return dAtA[:n], nil -} - -func (m *TableIndex) MarshalTo(dAtA []byte) (int, error) { - var i int - _ = i - var l int - _ = l - if len(m.Offsets) > 0 { - for _, msg := range m.Offsets { - dAtA[i] = 0xa - i++ - i = encodeVarintPb(dAtA, i, uint64(msg.Size())) - n, err := msg.MarshalTo(dAtA[i:]) - if err != nil { - return 0, err - } - i += n - } - } - if len(m.BloomFilter) > 0 { - dAtA[i] = 0x12 - i++ - i = encodeVarintPb(dAtA, i, uint64(len(m.BloomFilter))) - i += copy(dAtA[i:], m.BloomFilter) - } - if m.XXX_unrecognized != nil { - i += copy(dAtA[i:], m.XXX_unrecognized) - } - return i, nil -} - -func (m *Checksum) Marshal() (dAtA []byte, err error) { - size := m.Size() - dAtA = make([]byte, size) - n, err := m.MarshalTo(dAtA) - if err != nil { - return nil, err - } - return dAtA[:n], nil -} - -func (m *Checksum) MarshalTo(dAtA []byte) (int, error) { - var i int - _ = i - var l int - _ = l - if m.Algo != 0 { - dAtA[i] = 0x8 - i++ - i = encodeVarintPb(dAtA, i, uint64(m.Algo)) - } - if m.Sum != 0 { - dAtA[i] = 0x10 + if len(m.Checksum) > 0 { + dAtA[i] = 0x22 i++ - i = encodeVarintPb(dAtA, i, uint64(m.Sum)) + i = encodeVarintPb(dAtA, i, uint64(len(m.Checksum))) + i += copy(dAtA[i:], m.Checksum) } if m.XXX_unrecognized != nil { i += copy(dAtA[i:], m.XXX_unrecognized) @@ -913,47 +610,7 @@ func (m *ManifestChange) Size() (n int) { if m.Level != 0 { n += 1 + sovPb(uint64(m.Level)) } - if m.XXX_unrecognized != nil { - n += len(m.XXX_unrecognized) - } - return n -} - -func (m *BlockOffset) Size() (n int) { - if m == nil { - return 0 - } - var l int - _ = l - l = len(m.Key) - if l > 0 { - n += 1 + l + sovPb(uint64(l)) - } - if m.Offset != 0 { - n += 1 + sovPb(uint64(m.Offset)) - } - if m.Len != 0 { - n += 1 + sovPb(uint64(m.Len)) - } - if m.XXX_unrecognized != nil { - n += len(m.XXX_unrecognized) - } - return n -} - -func (m *TableIndex) Size() (n int) { - if m == nil { - return 0 - } - var l int - _ = l - if len(m.Offsets) > 0 { - for _, e := range m.Offsets { - l = e.Size() - n += 1 + l + sovPb(uint64(l)) - } - } - l = len(m.BloomFilter) + l = len(m.Checksum) if l > 0 { n += 1 + l + sovPb(uint64(l)) } @@ -963,24 +620,6 @@ func (m *TableIndex) Size() (n int) { return n } -func (m *Checksum) Size() (n int) { - if m == nil { - return 0 - } - var l int - _ = l - if m.Algo != 0 { - n += 1 + sovPb(uint64(m.Algo)) - } - if m.Sum != 0 { - n += 1 + sovPb(uint64(m.Sum)) - } - if m.XXX_unrecognized != nil { - n += len(m.XXX_unrecognized) - } - return n -} - func sovPb(x uint64) (n int) { for { n++ @@ -1503,63 +1142,9 @@ func (m *ManifestChange) Unmarshal(dAtA []byte) error { break } } - default: - iNdEx = preIndex - skippy, err := skipPb(dAtA[iNdEx:]) - if err != nil { - return err - } - if skippy < 0 { - return ErrInvalidLengthPb - } - if (iNdEx + skippy) < 0 { - return ErrInvalidLengthPb - } - if (iNdEx + skippy) > l { - return io.ErrUnexpectedEOF - } - m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...) - iNdEx += skippy - } - } - - if iNdEx > l { - return io.ErrUnexpectedEOF - } - return nil -} -func (m *BlockOffset) Unmarshal(dAtA []byte) error { - l := len(dAtA) - iNdEx := 0 - for iNdEx < l { - preIndex := iNdEx - var wire uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowPb - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - wire |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - fieldNum := int32(wire >> 3) - wireType := int(wire & 0x7) - if wireType == 4 { - return fmt.Errorf("proto: BlockOffset: wiretype end group for non-group") - } - if fieldNum <= 0 { - return fmt.Errorf("proto: BlockOffset: illegal tag %d (wire type %d)", fieldNum, wire) - } - switch fieldNum { - case 1: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Key", wireType) + case 4: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Checksum", wireType) } var byteLen int for shift := uint(0); ; shift += 7 { @@ -1586,263 +1171,11 @@ func (m *BlockOffset) Unmarshal(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - m.Key = append(m.Key[:0], dAtA[iNdEx:postIndex]...) - if m.Key == nil { - m.Key = []byte{} + m.Checksum = append(m.Checksum[:0], dAtA[iNdEx:postIndex]...) + if m.Checksum == nil { + m.Checksum = []byte{} } iNdEx = postIndex - case 2: - if wireType != 0 { - return fmt.Errorf("proto: wrong wireType = %d for field Offset", wireType) - } - m.Offset = 0 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowPb - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - m.Offset |= uint32(b&0x7F) << shift - if b < 0x80 { - break - } - } - case 3: - if wireType != 0 { - return fmt.Errorf("proto: wrong wireType = %d for field Len", wireType) - } - m.Len = 0 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowPb - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - m.Len |= uint32(b&0x7F) << shift - if b < 0x80 { - break - } - } - default: - iNdEx = preIndex - skippy, err := skipPb(dAtA[iNdEx:]) - if err != nil { - return err - } - if skippy < 0 { - return ErrInvalidLengthPb - } - if (iNdEx + skippy) < 0 { - return ErrInvalidLengthPb - } - if (iNdEx + skippy) > l { - return io.ErrUnexpectedEOF - } - m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...) - iNdEx += skippy - } - } - - if iNdEx > l { - return io.ErrUnexpectedEOF - } - return nil -} -func (m *TableIndex) Unmarshal(dAtA []byte) error { - l := len(dAtA) - iNdEx := 0 - for iNdEx < l { - preIndex := iNdEx - var wire uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowPb - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - wire |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - fieldNum := int32(wire >> 3) - wireType := int(wire & 0x7) - if wireType == 4 { - return fmt.Errorf("proto: TableIndex: wiretype end group for non-group") - } - if fieldNum <= 0 { - return fmt.Errorf("proto: TableIndex: illegal tag %d (wire type %d)", fieldNum, wire) - } - switch fieldNum { - case 1: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Offsets", wireType) - } - var msglen int - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowPb - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - msglen |= int(b&0x7F) << shift - if b < 0x80 { - break - } - } - if msglen < 0 { - return ErrInvalidLengthPb - } - postIndex := iNdEx + msglen - if postIndex < 0 { - return ErrInvalidLengthPb - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.Offsets = append(m.Offsets, &BlockOffset{}) - if err := m.Offsets[len(m.Offsets)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { - return err - } - iNdEx = postIndex - case 2: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field BloomFilter", wireType) - } - var byteLen int - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowPb - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - byteLen |= int(b&0x7F) << shift - if b < 0x80 { - break - } - } - if byteLen < 0 { - return ErrInvalidLengthPb - } - postIndex := iNdEx + byteLen - if postIndex < 0 { - return ErrInvalidLengthPb - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.BloomFilter = append(m.BloomFilter[:0], dAtA[iNdEx:postIndex]...) - if m.BloomFilter == nil { - m.BloomFilter = []byte{} - } - iNdEx = postIndex - default: - iNdEx = preIndex - skippy, err := skipPb(dAtA[iNdEx:]) - if err != nil { - return err - } - if skippy < 0 { - return ErrInvalidLengthPb - } - if (iNdEx + skippy) < 0 { - return ErrInvalidLengthPb - } - if (iNdEx + skippy) > l { - return io.ErrUnexpectedEOF - } - m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...) - iNdEx += skippy - } - } - - if iNdEx > l { - return io.ErrUnexpectedEOF - } - return nil -} -func (m *Checksum) Unmarshal(dAtA []byte) error { - l := len(dAtA) - iNdEx := 0 - for iNdEx < l { - preIndex := iNdEx - var wire uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowPb - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - wire |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - fieldNum := int32(wire >> 3) - wireType := int(wire & 0x7) - if wireType == 4 { - return fmt.Errorf("proto: Checksum: wiretype end group for non-group") - } - if fieldNum <= 0 { - return fmt.Errorf("proto: Checksum: illegal tag %d (wire type %d)", fieldNum, wire) - } - switch fieldNum { - case 1: - if wireType != 0 { - return fmt.Errorf("proto: wrong wireType = %d for field Algo", wireType) - } - m.Algo = 0 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowPb - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - m.Algo |= Checksum_Algorithm(b&0x7F) << shift - if b < 0x80 { - break - } - } - case 2: - if wireType != 0 { - return fmt.Errorf("proto: wrong wireType = %d for field Sum", wireType) - } - m.Sum = 0 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowPb - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - m.Sum |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } default: iNdEx = preIndex skippy, err := skipPb(dAtA[iNdEx:]) diff --git a/vendor/github.com/dgraph-io/badger/pb/pb.proto b/vendor/github.com/dgraph-io/badger/pb/pb.proto index e8d4cd24b72..c6e7f413d70 100644 --- a/vendor/github.com/dgraph-io/badger/pb/pb.proto +++ b/vendor/github.com/dgraph-io/badger/pb/pb.proto @@ -48,24 +48,5 @@ message ManifestChange { } Operation Op = 2; uint32 Level = 3; // Only used for CREATE -} - -message BlockOffset { - bytes key = 1; - uint32 offset = 2; - uint32 len = 3; -} - -message TableIndex { - repeated BlockOffset offsets = 1; - bytes bloom_filter = 2; -} - -message Checksum { - enum Algorithm { - CRC32C = 0; - XXHash64 = 1; - } - Algorithm algo = 1; // For storing type of Checksum algorithm used - uint64 sum = 2; + bytes Checksum = 4; // Only used for CREATE } diff --git a/vendor/github.com/dgraph-io/badger/publisher.go b/vendor/github.com/dgraph-io/badger/publisher.go index caf23b5c9c4..24588f5c68d 100644 --- a/vendor/github.com/dgraph-io/badger/publisher.go +++ b/vendor/github.com/dgraph-io/badger/publisher.go @@ -17,10 +17,10 @@ package badger import ( + "bytes" "sync" "github.com/dgraph-io/badger/pb" - "github.com/dgraph-io/badger/trie" "github.com/dgraph-io/badger/y" ) @@ -35,7 +35,6 @@ type publisher struct { pubCh chan requests subscribers map[uint64]subscriber nextID uint64 - indexer *trie.Trie } func newPublisher() *publisher { @@ -43,7 +42,6 @@ func newPublisher() *publisher { pubCh: make(chan requests, 1000), subscribers: make(map[uint64]subscriber), nextID: 0, - indexer: trie.NewTrie(), } } @@ -74,37 +72,42 @@ func (p *publisher) listenForUpdates(c *y.Closer) { } func (p *publisher) publishUpdates(reqs requests) { + kvs := &pb.KVList{} p.Lock() defer func() { p.Unlock() // Release all the request. reqs.DecrRef() }() - batchedUpdates := make(map[uint64]*pb.KVList) - for _, req := range reqs { - for _, e := range req.Entries { - ids := p.indexer.Get(e.Key) - if len(ids) > 0 { - k := y.SafeCopy(nil, e.Key) - kv := &pb.KV{ - Key: y.ParseKey(k), - Value: y.SafeCopy(nil, e.Value), - Meta: []byte{e.UserMeta}, - ExpiresAt: e.ExpiresAt, - Version: y.ParseTs(k), - } - for id := range ids { - if _, ok := batchedUpdates[id]; !ok { - batchedUpdates[id] = &pb.KVList{} + + // TODO: Optimize this, so we can figure out key -> subscriber quickly, without iterating over + // all the prefixes. + // TODO: Use trie to find subscribers. + for _, s := range p.subscribers { + // BUG: This would send out the same entry multiple times on multiple matches for the same + // subscriber. + for _, prefix := range s.prefixes { + for _, req := range reqs { + for _, e := range req.Entries { + if bytes.HasPrefix(e.Key, prefix) { + // TODO: Maybe we can optimize this by creating the KV once and sending it + // over to multiple subscribers. + k := y.SafeCopy(nil, e.Key) + kv := &pb.KV{ + Key: y.ParseKey(k), + Value: y.SafeCopy(nil, e.Value), + UserMeta: []byte{e.UserMeta}, + ExpiresAt: e.ExpiresAt, + Version: y.ParseTs(k), + } + kvs.Kv = append(kvs.Kv, kv) } - batchedUpdates[id].Kv = append(batchedUpdates[id].Kv, kv) } } } - } - - for id, kvs := range batchedUpdates { - p.subscribers[id].sendCh <- kvs + if len(kvs.GetKv()) > 0 { + s.sendCh <- kvs + } } } @@ -120,9 +123,6 @@ func (p *publisher) newSubscriber(c *y.Closer, prefixes ...[]byte) (<-chan *pb.K sendCh: ch, subCloser: c, } - for _, prefix := range prefixes { - p.indexer.Add(prefix, id) - } return ch, id } @@ -131,9 +131,6 @@ func (p *publisher) cleanSubscribers() { p.Lock() defer p.Unlock() for id, s := range p.subscribers { - for _, prefix := range s.prefixes { - p.indexer.Delete(prefix, id) - } delete(p.subscribers, id) s.subCloser.SignalAndWait() } @@ -142,15 +139,14 @@ func (p *publisher) cleanSubscribers() { func (p *publisher) deleteSubscriber(id uint64) { p.Lock() defer p.Unlock() - if s, ok := p.subscribers[id]; ok { - for _, prefix := range s.prefixes { - p.indexer.Delete(prefix, id) - } + if _, ok := p.subscribers[id]; !ok { + return } delete(p.subscribers, id) } func (p *publisher) sendUpdates(reqs []*request) { + // TODO: Prefix check before pushing into pubCh. if p.noOfSubscribers() != 0 { p.pubCh <- reqs } diff --git a/vendor/github.com/dgraph-io/badger/skl/arena.go b/vendor/github.com/dgraph-io/badger/skl/arena.go index 5aecb796277..def550712f8 100644 --- a/vendor/github.com/dgraph-io/badger/skl/arena.go +++ b/vendor/github.com/dgraph-io/badger/skl/arena.go @@ -120,8 +120,8 @@ func (s *Arena) getKey(offset uint32, size uint16) []byte { // getVal returns byte slice at offset. The given size should be just the value // size and should NOT include the meta bytes. -func (s *Arena) getVal(offset uint32, size uint32) (ret y.ValueStruct) { - ret.Decode(s.buf[offset : offset+size]) +func (s *Arena) getVal(offset uint32, size uint16) (ret y.ValueStruct) { + ret.Decode(s.buf[offset : offset+uint32(size)]) return } diff --git a/vendor/github.com/dgraph-io/badger/skl/skl.go b/vendor/github.com/dgraph-io/badger/skl/skl.go index 61e262cf4ce..fc2eff982ba 100644 --- a/vendor/github.com/dgraph-io/badger/skl/skl.go +++ b/vendor/github.com/dgraph-io/badger/skl/skl.go @@ -53,7 +53,7 @@ type node struct { // Multiple parts of the value are encoded as a single uint64 so that it // can be atomically loaded and stored: // value offset: uint32 (bits 0-31) - // value size : uint16 (bits 32-63) + // value size : uint16 (bits 32-47) value uint64 // A byte slice is 24 bytes. We are trying to save space here. @@ -113,13 +113,13 @@ func newNode(arena *Arena, key []byte, v y.ValueStruct, height int) *node { return node } -func encodeValue(valOffset uint32, valSize uint32) uint64 { +func encodeValue(valOffset uint32, valSize uint16) uint64 { return uint64(valSize)<<32 | uint64(valOffset) } -func decodeValue(value uint64) (valOffset uint32, valSize uint32) { +func decodeValue(value uint64) (valOffset uint32, valSize uint16) { valOffset = uint32(value) - valSize = uint32(value >> 32) + valSize = uint16(value >> 32) return } @@ -135,7 +135,7 @@ func NewSkiplist(arenaSize int64) *Skiplist { } } -func (s *node) getValueOffset() (uint32, uint32) { +func (s *node) getValueOffset() (uint32, uint16) { value := atomic.LoadUint64(&s.value) return decodeValue(value) } diff --git a/vendor/github.com/dgraph-io/badger/stream_writer.go b/vendor/github.com/dgraph-io/badger/stream_writer.go index 0b9e4854301..3d2a7992efd 100644 --- a/vendor/github.com/dgraph-io/badger/stream_writer.go +++ b/vendor/github.com/dgraph-io/badger/stream_writer.go @@ -311,7 +311,7 @@ func (w *sortedWriter) createTable(data []byte) error { if _, err := fd.Write(data); err != nil { return err } - tbl, err := table.OpenTable(fd, w.db.opt.TableLoadingMode, w.db.opt.ChecksumVerificationMode) + tbl, err := table.OpenTable(fd, w.db.opt.TableLoadingMode, nil) if err != nil { return err } @@ -341,9 +341,10 @@ func (w *sortedWriter) createTable(data []byte) error { } // Now that table can be opened successfully, let's add this to the MANIFEST. change := &pb.ManifestChange{ - Id: tbl.ID(), - Op: pb.ManifestChange_CREATE, - Level: uint32(lhandler.level), + Id: tbl.ID(), + Op: pb.ManifestChange_CREATE, + Level: uint32(lhandler.level), + Checksum: tbl.Checksum, } if err := w.db.manifest.addChanges([]*pb.ManifestChange{change}); err != nil { return err diff --git a/vendor/github.com/dgraph-io/badger/table/README.md b/vendor/github.com/dgraph-io/badger/table/README.md index 19276079ef1..a784f12680a 100644 --- a/vendor/github.com/dgraph-io/badger/table/README.md +++ b/vendor/github.com/dgraph-io/badger/table/README.md @@ -1,4 +1,4 @@ -Size of table is 123,217,667 bytes for all benchmarks. +Size of table is 122,173,606 bytes for all benchmarks. # BenchmarkRead ``` @@ -6,18 +6,18 @@ $ go test -bench ^BenchmarkRead$ -run ^$ -count 3 goos: linux goarch: amd64 pkg: github.com/dgraph-io/badger/table -BenchmarkRead-16 10 154074944 ns/op -BenchmarkRead-16 10 154340411 ns/op -BenchmarkRead-16 10 151914489 ns/op +BenchmarkRead-16 10 153281932 ns/op +BenchmarkRead-16 10 153454443 ns/op +BenchmarkRead-16 10 155349696 ns/op PASS -ok github.com/dgraph-io/badger/table 22.467s +ok github.com/dgraph-io/badger/table 23.549s ``` -Size of table is 123,217,667 bytes, which is ~118MB. +Size of table is 122,173,606 bytes, which is ~117MB. -The rate is ~762MB/s using LoadToRAM (when table is in RAM). +The rate is ~750MB/s using LoadToRAM (when table is in RAM). -To read a 64MB table, this would take ~0.084s, which is negligible. +To read a 64MB table, this would take ~0.0853s, which is negligible. # BenchmarkReadAndBuild ```go @@ -25,35 +25,32 @@ $ go test -bench BenchmarkReadAndBuild -run ^$ -count 3 goos: linux goarch: amd64 pkg: github.com/dgraph-io/badger/table -BenchmarkReadAndBuild-16 1 1026755231 ns/op -BenchmarkReadAndBuild-16 1 1009543316 ns/op -BenchmarkReadAndBuild-16 1 1039920546 ns/op +BenchmarkReadAndBuild-16 2 945041628 ns/op +BenchmarkReadAndBuild-16 2 947120893 ns/op +BenchmarkReadAndBuild-16 2 954909506 ns/op PASS -ok github.com/dgraph-io/badger/table 12.081s +ok github.com/dgraph-io/badger/table 26.856s ``` -The rate is ~123MB/s. To build a 64MB table, this would take ~0.56s. Note that this +The rate is ~122MB/s. To build a 64MB table, this would take ~0.52s. Note that this does NOT include the flushing of the table to disk. All we are doing above is reading one table (which is in RAM) and write one table in memory. -The table building takes 0.56-0.084s ~ 0.4823s. +The table building takes 0.52-0.0853s ~ 0.4347s. # BenchmarkReadMerged Below, we merge 5 tables. The total size remains unchanged at ~122M. ```go $ go test -bench ReadMerged -run ^$ -count 3 -goos: linux -goarch: amd64 -pkg: github.com/dgraph-io/badger/table -BenchmarkReadMerged-16 2 977588975 ns/op -BenchmarkReadMerged-16 2 982140738 ns/op -BenchmarkReadMerged-16 2 962046017 ns/op +BenchmarkReadMerged-16 2 954475788 ns/op +BenchmarkReadMerged-16 2 955252462 ns/op +BenchmarkReadMerged-16 2 956857353 ns/op PASS -ok github.com/dgraph-io/badger/table 27.433s +ok github.com/dgraph-io/badger/table 33.327s ``` -The rate is ~120MB/s. To read a 64MB table using merge iterator, this would take ~0.53s. +The rate is ~122MB/s. To read a 64MB table using merge iterator, this would take ~0.52s. # BenchmarkRandomRead @@ -62,47 +59,11 @@ go test -bench BenchmarkRandomRead$ -run ^$ -count 3 goos: linux goarch: amd64 pkg: github.com/dgraph-io/badger/table -BenchmarkRandomRead-16 500000 2645 ns/op -BenchmarkRandomRead-16 500000 2648 ns/op -BenchmarkRandomRead-16 500000 2614 ns/op +BenchmarkRandomRead-16 300000 3596 ns/op +BenchmarkRandomRead-16 300000 3621 ns/op +BenchmarkRandomRead-16 300000 3596 ns/op PASS -ok github.com/dgraph-io/badger/table 50.850s -``` -For random read benchmarking, we are randomly reading a key and verifying its value. - -# DB Open benchmark -1. Create badger DB with 2 billion key-value pairs (about 380GB of data) -``` -badger fill -m 2000 --dir="/tmp/data" --sorted -``` -2. Clear buffers and swap memory -``` -free -mh && sync && echo 3 | sudo tee /proc/sys/vm/drop_caches && sudo swapoff -a && sudo swapon -a && free -mh +ok github.com/dgraph-io/badger/table 44.727s ``` -Also flush disk buffers -``` -blockdev --flushbufs /dev/nvme0n1p4 -``` -3. Run the benchmark -``` -go test -run=^$ github.com/dgraph-io/badger -bench ^BenchmarkDBOpen$ -benchdir="/tmp/data" -v -badger 2019/06/04 17:15:56 INFO: 126 tables out of 1028 opened in 3.017s -badger 2019/06/04 17:15:59 INFO: 257 tables out of 1028 opened in 6.014s -badger 2019/06/04 17:16:02 INFO: 387 tables out of 1028 opened in 9.017s -badger 2019/06/04 17:16:05 INFO: 516 tables out of 1028 opened in 12.025s -badger 2019/06/04 17:16:08 INFO: 645 tables out of 1028 opened in 15.013s -badger 2019/06/04 17:16:11 INFO: 775 tables out of 1028 opened in 18.008s -badger 2019/06/04 17:16:14 INFO: 906 tables out of 1028 opened in 21.003s -badger 2019/06/04 17:16:17 INFO: All 1028 tables opened in 23.851s -badger 2019/06/04 17:16:17 INFO: Replaying file id: 1998 at offset: 332000 -badger 2019/06/04 17:16:17 INFO: Replay took: 9.81µs -goos: linux -goarch: amd64 -pkg: github.com/dgraph-io/badger -BenchmarkDBOpen-16 1 23930082140 ns/op -PASS -ok github.com/dgraph-io/badger 24.076s - -``` -It takes about 23.851s to open a DB with 2 billion sorted key-value entries. +For random read benchmarking, we are randomly reading a key and verifying its value. diff --git a/vendor/github.com/dgraph-io/badger/table/builder.go b/vendor/github.com/dgraph-io/badger/table/builder.go index 8b363a16061..0657cbca182 100644 --- a/vendor/github.com/dgraph-io/badger/table/builder.go +++ b/vendor/github.com/dgraph-io/badger/table/builder.go @@ -23,10 +23,13 @@ import ( "math" "github.com/AndreasBriese/bbloom" - "github.com/dgraph-io/badger/pb" "github.com/dgraph-io/badger/y" ) +var ( + restartInterval = 100 // Might want to change this to be based on total size instead of numKeys. +) + func newBuffer(sz int) *bytes.Buffer { b := new(bytes.Buffer) b.Grow(sz) @@ -36,38 +39,44 @@ func newBuffer(sz int) *bytes.Buffer { type header struct { plen uint16 // Overlap with base key. klen uint16 // Length of the diff. - vlen uint32 // Length of value. + vlen uint16 // Length of value. + prev uint32 // Offset for the previous key-value pair. The offset is relative to block base offset. } // Encode encodes the header. func (h header) Encode(b []byte) { binary.BigEndian.PutUint16(b[0:2], h.plen) binary.BigEndian.PutUint16(b[2:4], h.klen) - binary.BigEndian.PutUint32(b[4:8], h.vlen) + binary.BigEndian.PutUint16(b[4:6], h.vlen) + binary.BigEndian.PutUint32(b[6:10], h.prev) } // Decode decodes the header. func (h *header) Decode(buf []byte) int { h.plen = binary.BigEndian.Uint16(buf[0:2]) h.klen = binary.BigEndian.Uint16(buf[2:4]) - h.vlen = binary.BigEndian.Uint32(buf[4:8]) + h.vlen = binary.BigEndian.Uint16(buf[4:6]) + h.prev = binary.BigEndian.Uint32(buf[6:10]) return h.Size() } // Size returns size of the header. Currently it's just a constant. -func (h header) Size() int { return 8 } +func (h header) Size() int { return 10 } // Builder is used in building a table. type Builder struct { + counter int // Number of keys written for the current block. + // Typically tens or hundreds of meg. This is for one single file. buf *bytes.Buffer - blockSize uint32 // Max size of block. - baseKey []byte // Base key for the current block. - baseOffset uint32 // Offset for the current block. - entryOffsets []uint32 // Offsets of entries present in current block. + baseKey []byte // Base key for the current block. + baseOffset uint32 // Offset for the current block. - tableIndex *pb.TableIndex + restarts []uint32 // Base offsets of every block. + + // Tracks offset for the previous key-value pair. Offset is relative to block base offset. + prevOffset uint32 keyBuf *bytes.Buffer keyCount int @@ -78,10 +87,7 @@ func NewTableBuilder() *Builder { return &Builder{ keyBuf: newBuffer(1 << 20), buf: newBuffer(1 << 20), - tableIndex: &pb.TableIndex{}, - - // TODO(Ashish): make this configurable - blockSize: 4 * 1024, + prevOffset: math.MaxUint32, // Used for the first element! } } @@ -127,83 +133,38 @@ func (b *Builder) addHelper(key []byte, v y.ValueStruct) { h := header{ plen: uint16(len(key) - len(diffKey)), klen: uint16(len(diffKey)), - vlen: uint32(v.EncodedSize()), + vlen: uint16(v.EncodedSize()), + prev: b.prevOffset, // prevOffset is the location of the last key-value added. } - - // store current entry's offset - y.AssertTrue(b.buf.Len() < math.MaxUint32) - b.entryOffsets = append(b.entryOffsets, uint32(b.buf.Len())-b.baseOffset) + b.prevOffset = uint32(b.buf.Len()) - b.baseOffset // Remember current offset for the next Add call. // Layout: header, diffKey, value. - var hbuf [8]byte + var hbuf [10]byte h.Encode(hbuf[:]) b.buf.Write(hbuf[:]) b.buf.Write(diffKey) // We only need to store the key difference. v.EncodeTo(b.buf) + b.counter++ // Increment number of keys added for this current block. } -/* -Structure of Block. -+-------------------+---------------------+--------------------+--------------+------------------+ -| Entry1 | Entry2 | Entry3 | Entry4 | Entry5 | -+-------------------+---------------------+--------------------+--------------+------------------+ -| Entry6 | ... | ... | ... | EntryN | -+-------------------+---------------------+--------------------+--------------+------------------+ -| Block Meta(contains list of offsets used| Block Meta Size | Block | Checksum Size | -| to perform binary search in the block) | (4 Bytes) | Checksum | (4 Bytes) | -+-----------------------------------------+--------------------+--------------+------------------+ -*/ func (b *Builder) finishBlock() { - ebuf := make([]byte, len(b.entryOffsets)*4+4) - for i, offset := range b.entryOffsets { - binary.BigEndian.PutUint32(ebuf[4*i:4*i+4], uint32(offset)) - } - binary.BigEndian.PutUint32(ebuf[len(ebuf)-4:], uint32(len(b.entryOffsets))) - b.buf.Write(ebuf) - - blockBuf := b.buf.Bytes()[b.baseOffset:] // Store checksum for current block. - b.writeChecksum(blockBuf) - - // TODO(Ashish):Add padding: If we want to make block as multiple of OS pages, we can - // implement padding. This might be useful while using direct I/O. - - // Add key to the block index - bo := &pb.BlockOffset{ - Key: y.Copy(b.baseKey), - Offset: b.baseOffset, - Len: uint32(b.buf.Len()) - b.baseOffset, - } - b.tableIndex.Offsets = append(b.tableIndex.Offsets, bo) -} - -func (b *Builder) shouldFinishBlock(key []byte, value y.ValueStruct) bool { - // If there is no entry till now, we will return false. - if len(b.entryOffsets) <= 0 { - return false - } - - y.AssertTrue((len(b.entryOffsets)+1)*4+4+8+4 < math.MaxUint32) // check for below statements - // We should include current entry also in size, that's why +1 to len(b.entryOffsets). - entriesOffsetsSize := uint32((len(b.entryOffsets)+1)*4 + - 4 + // size of list - 8 + // Sum64 in checksum proto - 4) // checksum length - estimatedSize := uint32(b.buf.Len()) - b.baseOffset + uint32(6 /*header size for entry*/) + - uint32(len(key)) + uint32(value.EncodedSize()) + entriesOffsetsSize - - return estimatedSize > b.blockSize + // When we are at the end of the block and Valid=false, and the user wants to do a Prev, + // we need a dummy header to tell us the offset of the previous key-value pair. + b.addHelper([]byte{}, y.ValueStruct{}) } // Add adds a key-value pair to the block. +// If doNotRestart is true, we will not restart even if b.counter >= restartInterval. func (b *Builder) Add(key []byte, value y.ValueStruct) error { - if b.shouldFinishBlock(key, value) { + if b.counter >= restartInterval { b.finishBlock() // Start a new block. Initialize the block. + b.restarts = append(b.restarts, uint32(b.buf.Len())) + b.counter = 0 b.baseKey = []byte{} - y.AssertTrue(b.buf.Len() < math.MaxUint32) b.baseOffset = uint32(b.buf.Len()) - b.entryOffsets = b.entryOffsets[:0] + b.prevOffset = math.MaxUint32 // First key-value pair of block has header.prev=MaxInt. } b.addHelper(key, value) return nil // Currently, there is no meaningful error. @@ -217,29 +178,30 @@ func (b *Builder) Add(key []byte, value y.ValueStruct) error { // ReachedCapacity returns true if we... roughly (?) reached capacity? func (b *Builder) ReachedCapacity(cap int64) bool { - blocksSize := b.buf.Len() + // length of current buffer - len(b.entryOffsets)*4 + // all entry offsets size - 4 + // count of all entry offsets - 8 + // checksum bytes - 4 // checksum length - estimateSz := blocksSize + - 4 + // Index length - 5*(len(b.tableIndex.Offsets)) // approximate index size - + estimateSz := b.buf.Len() + 8 /* empty header */ + 4*len(b.restarts) + + 8 /* 8 = end of buf offset + len(restarts) */ return int64(estimateSz) > cap } +// blockIndex generates the block index for the table. +// It is mainly a list of all the block base offsets. +func (b *Builder) blockIndex() []byte { + // Store the end offset, so we know the length of the final block. + b.restarts = append(b.restarts, uint32(b.buf.Len())) + + // Add 4 because we want to write out number of restarts at the end. + sz := 4*len(b.restarts) + 4 + out := make([]byte, sz) + buf := out + for _, r := range b.restarts { + binary.BigEndian.PutUint32(buf[:4], r) + buf = buf[4:] + } + binary.BigEndian.PutUint32(buf[:4], uint32(len(b.restarts))) + return out +} + // Finish finishes the table by appending the index. -/* -The table structure looks like -+---------+------------+-----------+---------------+ -| Block 1 | Block 2 | Block 3 | Block 4 | -+---------+------------+-----------+---------------+ -| Block 5 | Block 6 | Block ... | Block N | -+---------+------------+-----------+---------------+ -| Index | Index Size | Checksum | Checksum Size | -+---------+------------+-----------+---------------+ -*/ func (b *Builder) Finish() []byte { bf := bbloom.New(float64(b.keyCount), 0.01) var klen [2]byte @@ -259,52 +221,17 @@ func (b *Builder) Finish() []byte { bf.Add(key) } - // Add bloom filter to the index. - b.tableIndex.BloomFilter = bf.JSONMarshal() b.finishBlock() // This will never start a new block. + index := b.blockIndex() + b.buf.Write(index) - index, err := b.tableIndex.Marshal() - y.Check(err) - // Write index the file. - n, err := b.buf.Write(index) + // Write bloom filter. + bdata := bf.JSONMarshal() + n, err := b.buf.Write(bdata) y.Check(err) - - y.AssertTrue(n < math.MaxUint32) - // Write index size. var buf [4]byte binary.BigEndian.PutUint32(buf[:], uint32(n)) - _, err = b.buf.Write(buf[:]) - y.Check(err) + b.buf.Write(buf[:]) - b.writeChecksum(index) return b.buf.Bytes() } - -func (b *Builder) writeChecksum(data []byte) { - // Build checksum for the index. - checksum := pb.Checksum{ - // TODO: The checksum type should be configurable from the - // options. - // We chose to use CRC32 as the default option because - // it performed better compared to xxHash64. - // See the BenchmarkChecksum in table_test.go file - // Size => 1024 B 2048 B - // CRC32 => 63.7 ns/op 112 ns/op - // xxHash64 => 87.5 ns/op 158 ns/op - Sum: y.CalculateChecksum(data, pb.Checksum_CRC32C), - Algo: pb.Checksum_CRC32C, - } - - // Write checksum to the file. - chksum, err := checksum.Marshal() - y.Check(err) - n, err := b.buf.Write(chksum) - y.Check(err) - - y.AssertTrue(n < math.MaxUint32) - // Write checksum size. - var buf [4]byte - binary.BigEndian.PutUint32(buf[:], uint32(n)) - _, err = b.buf.Write(buf[:]) - y.Check(err) -} diff --git a/vendor/github.com/dgraph-io/badger/table/iterator.go b/vendor/github.com/dgraph-io/badger/table/iterator.go index c1df37ad8f9..0eb5ed01a91 100644 --- a/vendor/github.com/dgraph-io/badger/table/iterator.go +++ b/vendor/github.com/dgraph-io/badger/table/iterator.go @@ -18,8 +18,8 @@ package table import ( "bytes" - "encoding/binary" "io" + "math" "sort" "github.com/dgraph-io/badger/y" @@ -27,17 +27,16 @@ import ( ) type blockIterator struct { - data []byte - pos uint32 - err error - baseKey []byte - numEntries int - entriesIndexStart int - currentIdx int + data []byte + pos uint32 + err error + baseKey []byte key []byte val []byte init bool + + last header // The last header we saw. } func (itr *blockIterator) Reset() { @@ -47,13 +46,11 @@ func (itr *blockIterator) Reset() { itr.key = []byte{} itr.val = []byte{} itr.init = false - itr.currentIdx = -1 + itr.last = header{} } func (itr *blockIterator) Init() { if !itr.init { - itr.currentIdx = -1 - itr.Next() } } @@ -73,65 +70,28 @@ var ( current = 1 ) -func (itr *blockIterator) getOffset(idx int) uint32 { - y.AssertTrue(idx >= 0 && idx < itr.numEntries) - return binary.BigEndian.Uint32(itr.data[itr.entriesIndexStart+4*idx:]) -} - -func (itr *blockIterator) getKey(idx int) []byte { - y.AssertTrue(idx >= 0 && idx < itr.numEntries) - - idxPos := itr.getOffset(idx) - var h header - idxPos += uint32(h.Decode(itr.data[idxPos:])) - - // Convert to int before adding to avoid uint16 overflow. - idxKey := make([]byte, int(h.plen)+int(h.klen)) - copy(idxKey, itr.baseKey[:h.plen]) - copy(idxKey[h.plen:], itr.data[idxPos:idxPos+uint32(h.klen)]) - - return idxKey -} - // Seek brings us to the first block element that is >= input key. func (itr *blockIterator) Seek(key []byte, whence int) { itr.err = nil - startIndex := 0 // This tells from which index we should start binary search. switch whence { case origin: itr.Reset() case current: - startIndex = itr.currentIdx } - itr.Init() // If iterator is not initialized or has been reset. - - idx := sort.Search(itr.numEntries, func(idx int) bool { - // If idx is less than start index then just return false. - if idx < startIndex { - return false + var done bool + for itr.Init(); itr.Valid(); itr.Next() { + k := itr.Key() + if y.CompareKeys(k, key) >= 0 { + // We are done as k is >= key. + done = true + break } - - idxKey := itr.getKey(idx) - return y.CompareKeys(idxKey, key) >= 0 - }) - - // All keys in the block are less than the key to be sought. - if idx >= itr.numEntries { + } + if !done { itr.err = io.EOF - // Update currentIdx to len of entryOffsets, so that if Prev() is - // called just after Seek, it will return correct result. - itr.currentIdx = itr.numEntries - return } - - // Found first idx for which key is >= key to be sought. - itr.currentIdx = idx - itr.pos = itr.getOffset(itr.currentIdx) - var h header - itr.pos += uint32(h.Decode(itr.data[itr.pos:])) - itr.parseKV(h) } func (itr *blockIterator) SeekToFirst() { @@ -142,9 +102,8 @@ func (itr *blockIterator) SeekToFirst() { // SeekToLast brings us to the last element. Valid should return true. func (itr *blockIterator) SeekToLast() { itr.err = nil - - itr.Init() - itr.currentIdx = itr.numEntries + for itr.Init(); itr.Valid(); itr.Next() { + } itr.Prev() } @@ -171,15 +130,20 @@ func (itr *blockIterator) parseKV(h header) { func (itr *blockIterator) Next() { itr.init = true itr.err = nil - - itr.currentIdx++ - if itr.currentIdx >= itr.numEntries { + if itr.pos >= uint32(len(itr.data)) { itr.err = io.EOF return } var h header itr.pos += uint32(h.Decode(itr.data[itr.pos:])) + itr.last = h // Store the last header. + + if h.klen == 0 && h.plen == 0 { + // Last entry in the table. + itr.err = io.EOF + return + } // Populate baseKey if it isn't set yet. This would only happen for the first Next. if len(itr.baseKey) == 0 { @@ -195,20 +159,21 @@ func (itr *blockIterator) Prev() { return } itr.err = nil - - itr.currentIdx-- - y.AssertTrue(itr.currentIdx < itr.numEntries) - if itr.currentIdx < 0 { + if itr.last.prev == math.MaxUint32 { + // This is the first element of the block! itr.err = io.EOF + itr.pos = 0 return } - itr.pos = itr.getOffset(itr.currentIdx) + // Move back using current header's prev. + itr.pos = itr.last.prev var h header y.AssertTruef(itr.pos < uint32(len(itr.data)), "%d %d", itr.pos, len(itr.data)) itr.pos += uint32(h.Decode(itr.data[itr.pos:])) itr.parseKV(h) + itr.last = h } func (itr *blockIterator) Key() []byte { @@ -317,7 +282,7 @@ func (itr *Iterator) seekFrom(key []byte, whence int) { idx := sort.Search(len(itr.t.blockIndex), func(idx int) bool { ko := itr.t.blockIndex[idx] - return y.CompareKeys(ko.Key, key) > 0 + return y.CompareKeys(ko.key, key) > 0 }) if idx == 0 { // The smallest key in our table is already strictly > key. We can return that. diff --git a/vendor/github.com/dgraph-io/badger/table/table.go b/vendor/github.com/dgraph-io/badger/table/table.go index ae5ef81ca79..0a1f42d464f 100644 --- a/vendor/github.com/dgraph-io/badger/table/table.go +++ b/vendor/github.com/dgraph-io/badger/table/table.go @@ -17,6 +17,8 @@ package table import ( + "bytes" + "crypto/sha256" "encoding/binary" "fmt" "io" @@ -28,8 +30,6 @@ import ( "sync" "sync/atomic" - "github.com/dgraph-io/badger/pb" - "github.com/AndreasBriese/bbloom" "github.com/dgraph-io/badger/options" "github.com/dgraph-io/badger/y" @@ -38,6 +38,12 @@ import ( const fileSuffix = ".sst" +type keyOffset struct { + key []byte + offset int + len int +} + // TableInterface is useful for testing. type TableInterface interface { Smallest() []byte @@ -52,7 +58,7 @@ type Table struct { fd *os.File // Own fd. tableSize int // Initialized in OpenTable, using fd.Stat(). - blockIndex []*pb.BlockOffset + blockIndex []keyOffset ref int32 // For file garbage collection. Atomic. loadingMode options.FileLoadingMode @@ -65,7 +71,6 @@ type Table struct { bf bbloom.Bloom Checksum []byte - chkMode options.ChecksumVerificationMode // indicates when to verify checksum for blocks. } // IncrRef increments the refcount (having to do with whether the file should be deleted) @@ -102,46 +107,19 @@ func (t *Table) DecrRef() error { } type block struct { - offset int - data []byte - numEntries int // number of entries present in the block - entriesIndexStart int // start index of entryOffsets list - chkLen int // checksum length -} - -func (b block) verifyCheckSum() error { - readPos := len(b.data) - 4 - b.chkLen - if readPos < 0 { - // This should be rare, hence can create a error instead of having global error. - return fmt.Errorf("block does not contain checksum") - } - - cs := &pb.Checksum{} - if err := cs.Unmarshal(b.data[readPos : readPos+b.chkLen]); err != nil { - return y.Wrapf(err, "unable to unmarshal checksum for block") - } - - return y.VerifyChecksum(b.data[:readPos], cs) + offset int + data []byte } func (b block) NewIterator() *blockIterator { - bi := &blockIterator{ - data: b.data, - numEntries: b.numEntries, - entriesIndexStart: b.entriesIndexStart, - } - - return bi + return &blockIterator{data: b.data} } -// OpenTable assumes file has only one table and opens it. Takes ownership of fd upon function -// entry. Returns a table with one reference count on it (decrementing which may delete the file! -// -- consider t.Close() instead). The fd has to writeable because we call Truncate on it before -// deleting. Checksum for all blocks of table is verified based on value of chkMode. -// TODO:(Ashish): convert individual args to option struct. -func OpenTable(fd *os.File, mode options.FileLoadingMode, - chkMode options.ChecksumVerificationMode) (*Table, error) { - +// OpenTable assumes file has only one table and opens it. Takes ownership of fd upon function +// entry. Returns a table with one reference count on it (decrementing which may delete the file! +// -- consider t.Close() instead). The fd has to writeable because we call Truncate on it before +// deleting. +func OpenTable(fd *os.File, mode options.FileLoadingMode, cksum []byte) (*Table, error) { fileInfo, err := fd.Stat() if err != nil { // It's OK to ignore fd.Close() errs in this function because we have only read @@ -161,11 +139,22 @@ func OpenTable(fd *os.File, mode options.FileLoadingMode, ref: 1, // Caller is given one reference. id: id, loadingMode: mode, - chkMode: chkMode, } t.tableSize = int(fileInfo.Size()) + // We first load to RAM, so we can read the index and do checksum. + if err := t.loadToRAM(); err != nil { + return nil, err + } + // Enforce checksum before we read index. Otherwise, if the file was + // truncated, we'd end up with panics in readIndex. + if len(cksum) > 0 && !bytes.Equal(t.Checksum, cksum) { + return nil, fmt.Errorf( + "CHECKSUM_MISMATCH: Table checksum does not match checksum in MANIFEST."+ + " NOT including table %s. This would lead to missing data."+ + "\n sha256 %x Expected\n sha256 %x Found\n", filename, cksum, t.Checksum) + } if err := t.readIndex(); err != nil { return nil, y.Wrap(err) } @@ -186,20 +175,7 @@ func OpenTable(fd *os.File, mode options.FileLoadingMode, switch mode { case options.LoadToRAM: - if _, err := t.fd.Seek(0, io.SeekStart); err != nil { - return nil, err - } - t.mmap = make([]byte, t.tableSize) - n, err := t.fd.Read(t.mmap) - if err != nil { - // It's OK to ignore fd.Close() error because we have only read from the file. - _ = t.fd.Close() - return nil, y.Wrapf(err, "Failed to load file into RAM") - } - if n != t.tableSize { - return nil, errors.Errorf("Failed to read all bytes from the file."+ - "Bytes in file: %d Bytes actually Read: %d", t.tableSize, n) - } + // No need to do anything. t.mmap is already filled. case options.MemoryMap: t.mmap, err = y.Mmap(fd, false, fileInfo.Size()) if err != nil { @@ -211,14 +187,6 @@ func OpenTable(fd *os.File, mode options.FileLoadingMode, default: panic(fmt.Sprintf("Invalid loading mode: %v", mode)) } - - if t.chkMode == options.OnTableRead || t.chkMode == options.OnTableAndBlockRead { - if err := t.VerifyChecksum(); err != nil { - _ = fd.Close() - return nil, err - } - } - return t, nil } @@ -255,71 +223,76 @@ func (t *Table) readNoFail(off, sz int) []byte { } func (t *Table) readIndex() error { + if len(t.mmap) != t.tableSize { + panic("Table size does not match the read bytes") + } readPos := t.tableSize - // Read checksum len from the last 4 bytes. + // Read bloom filter. readPos -= 4 buf := t.readNoFail(readPos, 4) - checksumLen := int(binary.BigEndian.Uint32(buf)) - - // Read checksum. - expectedChk := &pb.Checksum{} - readPos -= checksumLen - buf = t.readNoFail(readPos, checksumLen) - if err := expectedChk.Unmarshal(buf); err != nil { - return err - } + bloomLen := int(binary.BigEndian.Uint32(buf)) + readPos -= bloomLen + data := t.readNoFail(readPos, bloomLen) + t.bf = bbloom.JSONUnmarshal(data) - // Read index size from the footer. readPos -= 4 buf = t.readNoFail(readPos, 4) - indexLen := int(binary.BigEndian.Uint32(buf)) - // Read index. - readPos -= indexLen - data := t.readNoFail(readPos, indexLen) + restartsLen := int(binary.BigEndian.Uint32(buf)) - if err := y.VerifyChecksum(data, expectedChk); err != nil { - return y.Wrapf(err, "failed to verify checksum for table: %s", t.Filename()) + readPos -= 4 * restartsLen + buf = t.readNoFail(readPos, 4*restartsLen) + + offsets := make([]int, restartsLen) + for i := 0; i < restartsLen; i++ { + offsets[i] = int(binary.BigEndian.Uint32(buf[:4])) + buf = buf[4:] } - index := pb.TableIndex{} - err := index.Unmarshal(data) - y.Check(err) + // The last offset stores the end of the last block. + for i := 0; i < len(offsets); i++ { + var o int + if i == 0 { + o = 0 + } else { + o = offsets[i-1] + } + + ko := keyOffset{ + offset: o, + len: offsets[i] - o, + } + t.blockIndex = append(t.blockIndex, ko) + } + + // Execute this index read serially, because we already have table data in memory. + var h header + for idx := range t.blockIndex { + ko := &t.blockIndex[idx] + + hbuf := t.readNoFail(ko.offset, h.Size()) + h.Decode(hbuf) + y.AssertTrue(h.plen == 0) + + key := t.readNoFail(ko.offset+len(hbuf), int(h.klen)) + ko.key = append([]byte{}, key...) + } - t.bf = bbloom.JSONUnmarshal(index.BloomFilter) - t.blockIndex = index.Offsets return nil } -func (t *Table) block(idx int) (*block, error) { +func (t *Table) block(idx int) (block, error) { y.AssertTruef(idx >= 0, "idx=%d", idx) if idx >= len(t.blockIndex) { - return nil, errors.New("block out of index") + return block{}, errors.New("block out of index") } ko := t.blockIndex[idx] - blk := &block{ - offset: int(ko.Offset), + blk := block{ + offset: ko.offset, } var err error - blk.data, err = t.read(blk.offset, int(ko.Len)) - - // Read meta data related to block. - readPos := len(blk.data) - 4 // First read checksum length. - blk.chkLen = int(binary.BigEndian.Uint32(blk.data[readPos : readPos+4])) - - // Skip reading checksum, and move position to read numEntries in block. - readPos -= (blk.chkLen + 4) - blk.numEntries = int(binary.BigEndian.Uint32(blk.data[readPos : readPos+4])) - blk.entriesIndexStart = readPos - (blk.numEntries * 4) - - // Verify checksum on if checksum verification mode is OnRead on OnStartAndRead. - if t.chkMode == options.OnBlockRead || t.chkMode == options.OnTableAndBlockRead { - if err = blk.verifyCheckSum(); err != nil { - return nil, err - } - } - + blk.data, err = t.read(blk.offset, ko.len) return blk, err } @@ -342,30 +315,6 @@ func (t *Table) ID() uint64 { return t.id } // bloom filter lookup. func (t *Table) DoesNotHave(key []byte) bool { return !t.bf.Has(key) } -// VerifyChecksum verifies checksum for all blocks of table. This function is called by -// OpenTable() function. This function is also called inside levelsController.VerifyChecksum(). -func (t *Table) VerifyChecksum() error { - for i, os := range t.blockIndex { - b, err := t.block(i) - if err != nil { - return y.Wrapf(err, "checksum validation failed for table: %s, block: %d, offset:%d", - t.Filename(), i, os.Offset) - } - - // OnBlockRead or OnTableAndBlockRead, we don't need to call verify checksum - // on block, verification would be done while reading block itself. - if !(t.chkMode == options.OnBlockRead || t.chkMode == options.OnTableAndBlockRead) { - if err = b.verifyCheckSum(); err != nil { - return y.Wrapf(err, - "checksum validation failed for table: %s, block: %d, offset:%d", - t.Filename(), i, os.Offset) - } - } - } - - return nil -} - // ParseFileID reads the file id out of a filename. func ParseFileID(name string) (uint64, bool) { name = path.Base(name) @@ -392,3 +341,20 @@ func IDToFilename(id uint64) string { func NewFilename(id uint64, dir string) string { return filepath.Join(dir, IDToFilename(id)) } + +func (t *Table) loadToRAM() error { + if _, err := t.fd.Seek(0, io.SeekStart); err != nil { + return err + } + t.mmap = make([]byte, t.tableSize) + sum := sha256.New() + tee := io.TeeReader(t.fd, sum) + read, err := tee.Read(t.mmap) + if err != nil || read != t.tableSize { + return y.Wrapf(err, "Unable to load file in memory. Table file: %s", t.Filename()) + } + t.Checksum = sum.Sum(nil) + y.NumReads.Add(1) + y.NumBytesRead.Add(int64(read)) + return nil +} diff --git a/vendor/github.com/dgraph-io/badger/trie/trie.go b/vendor/github.com/dgraph-io/badger/trie/trie.go deleted file mode 100644 index f856869c53f..00000000000 --- a/vendor/github.com/dgraph-io/badger/trie/trie.go +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Copyright 2019 Dgraph Labs, Inc. and Contributors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package trie - -type node struct { - children map[byte]*node - ids []uint64 -} - -func newNode() *node { - return &node{ - children: make(map[byte]*node), - ids: []uint64{}, - } -} - -// Trie datastructure. -type Trie struct { - root *node -} - -// NewTrie returns Trie. -func NewTrie() *Trie { - return &Trie{ - root: newNode(), - } -} - -// Add adds the id in the trie for the given prefix path. -func (t *Trie) Add(prefix []byte, id uint64) { - node := t.root - for _, val := range prefix { - child, ok := node.children[val] - if !ok { - child = newNode() - node.children[val] = child - } - node = child - } - // We only need to add the id to the last node of the given prefix. - node.ids = append(node.ids, id) -} - -// Get returns prefix matched ids for the given key. -func (t *Trie) Get(key []byte) map[uint64]struct{} { - out := make(map[uint64]struct{}) - node := t.root - for _, val := range key { - child, ok := node.children[val] - if !ok { - break - } - // We need ids of the all the node in the matching key path. - for _, id := range child.ids { - out[id] = struct{}{} - } - node = child - } - return out -} - -// Delete will delete the id if the id exist in the given index path. -func (t *Trie) Delete(index []byte, id uint64) { - node := t.root - for _, val := range index { - child, ok := node.children[val] - if !ok { - return - } - node = child - } - // We're just removing the id not the hanging path. - out := node.ids[:0] - for _, val := range node.ids { - if val != id { - out = append(out, val) - } - } - for i := len(out); i < len(node.ids); i++ { - node.ids[i] = 0 // garbage collecting - } - node.ids = out -} diff --git a/vendor/github.com/dgraph-io/badger/value.go b/vendor/github.com/dgraph-io/badger/value.go index c98693b10b7..ff28b7566e8 100644 --- a/vendor/github.com/dgraph-io/badger/value.go +++ b/vendor/github.com/dgraph-io/badger/value.go @@ -74,6 +74,29 @@ type logFile struct { loadingMode options.FileLoadingMode } +// openReadOnly assumes that we have a write lock on logFile. +func (lf *logFile) openReadOnly() error { + var err error + lf.fd, err = os.OpenFile(lf.path, os.O_RDONLY, 0666) + if err != nil { + return errors.Wrapf(err, "Unable to open %q as RDONLY.", lf.path) + } + + fi, err := lf.fd.Stat() + if err != nil { + return errors.Wrapf(err, "Unable to check stat for %q", lf.path) + } + y.AssertTrue(fi.Size() <= math.MaxUint32) + lf.size = uint32(fi.Size()) + + if err = lf.mmap(fi.Size()); err != nil { + _ = lf.fd.Close() + return y.Wrapf(err, "Unable to map file: %q", fi.Name()) + } + + return nil +} + func (lf *logFile) mmap(size int64) (err error) { if lf.loadingMode != options.MemoryMap { // Nothing to do @@ -125,21 +148,33 @@ func (lf *logFile) read(p valuePointer, s *y.Slice) (buf []byte, err error) { } func (lf *logFile) doneWriting(offset uint32) error { - // Sync before acquiring lock. (We call this from write() and thus know we have shared access + // Sync before acquiring lock. (We call this from write() and thus know we have shared access // to the fd.) if err := y.FileSync(lf.fd); err != nil { return errors.Wrapf(err, "Unable to sync value log: %q", lf.path) } - + // Close and reopen the file read-only. Acquire lock because fd will become invalid for a bit. + // Acquiring the lock is bad because, while we don't hold the lock for a long time, it forces + // one batch of readers wait for the preceding batch of readers to finish. + // + // If there's a benefit to reopening the file read-only, it might be on Windows. I don't know + // what the benefit is. Consider keeping the file read-write, or use fcntl to change + // permissions. + lf.lock.Lock() + defer lf.lock.Unlock() + if err := lf.munmap(); err != nil { + return err + } // TODO: Confirm if we need to run a file sync after truncation. // Truncation must run after unmapping, otherwise Windows would crap itself. if err := lf.fd.Truncate(int64(offset)); err != nil { return errors.Wrapf(err, "Unable to truncate file: %q", lf.path) } + if err := lf.fd.Close(); err != nil { + return errors.Wrapf(err, "Unable to close value log: %q", lf.path) + } - // Previously we used to close the file after it was written and reopen it in read-only mode. - // We no longer open files in read-only mode. We keep all vlog files open in read-write mode. - return nil + return lf.openReadOnly() } // You must hold lf.lock to sync() @@ -362,10 +397,9 @@ func (vlog *valueLog) rewrite(f *logFile, tr trace.Trace) error { } ne.Value = append([]byte{}, e.Value...) - es := int64(ne.estimateSize(vlog.opt.ValueThreshold)) - // Ensure length and size of wb is within transaction limits. - if int64(len(wb)+1) > vlog.opt.maxBatchCount || - size+es > vlog.opt.maxBatchSize { + wb = append(wb, ne) + size += int64(e.estimateSize(vlog.opt.ValueThreshold)) + if size >= 64*mi { tr.LazyPrintf("request has %d entries, size %d", len(wb), size) if err := vlog.db.batchSet(wb); err != nil { return err @@ -373,8 +407,6 @@ func (vlog *valueLog) rewrite(f *logFile, tr trace.Trace) error { size = 0 wb = wb[:0] } - wb = append(wb, ne) - size += es } else { vlog.db.opt.Warningf("This entry should have been caught. %+v\n", e) } @@ -682,6 +714,18 @@ func errFile(err error, path string, msg string) error { } func (vlog *valueLog) replayLog(lf *logFile, offset uint32, replayFn logEntry) error { + var err error + mode := os.O_RDONLY + if vlog.opt.Truncate { + // We should open the file in RW mode, so it can be truncated. + mode = os.O_RDWR + } + lf.fd, err = os.OpenFile(lf.path, mode, 0) + if err != nil { + return errFile(err, lf.path, "Open file") + } + defer lf.fd.Close() + fi, err := lf.fd.Stat() if err != nil { return errFile(err, lf.path, "Unable to run file.Stat") @@ -738,23 +782,13 @@ func (vlog *valueLog) open(db *DB, ptr valuePointer, replayFn logEntry) error { for _, fid := range fids { lf, ok := vlog.filesMap[fid] y.AssertTrue(ok) - var flags uint32 - switch { - case vlog.opt.ReadOnly: - // If we have read only, we don't need SyncWrites. - flags |= y.ReadOnly - // Set sync flag. - case vlog.opt.SyncWrites: - flags |= y.Sync - } - - // Open log file "lf" in read-write mode. - if err := lf.open(vlog.fpath(lf.fid), flags); err != nil { - return err - } + // This file is before the value head pointer. So, we don't need to // replay it, and can just open it in readonly mode. if fid < ptr.Fid { + if err := lf.openReadOnly(); err != nil { + return err + } continue } @@ -779,6 +813,25 @@ func (vlog *valueLog) open(db *DB, ptr valuePointer, replayFn logEntry) error { return err } vlog.db.opt.Infof("Replay took: %s\n", time.Since(now)) + + if fid < vlog.maxFid { + if err := lf.openReadOnly(); err != nil { + return err + } + } else { + var flags uint32 + switch { + case vlog.opt.ReadOnly: + // If we have read only, we don't need SyncWrites. + flags |= y.ReadOnly + case vlog.opt.SyncWrites: + flags |= y.Sync + } + var err error + if lf.fd, err = y.OpenExistingFile(vlog.fpath(fid), flags); err != nil { + return errFile(err, lf.path, "Open existing file") + } + } } // Seek to the end to start writing. @@ -800,32 +853,7 @@ func (vlog *valueLog) open(db *DB, ptr valuePointer, replayFn logEntry) error { return errFile(err, last.path, "Map log file") } if err := vlog.populateDiscardStats(); err != nil { - // Print the error and continue. We don't want to prevent value log open if there's an error - // with the fetching discards stats. - db.opt.Errorf("Failed to populate discard stats: %s", err) - } - return nil -} - -func (lf *logFile) open(filename string, flags uint32) error { - var err error - if lf.fd, err = y.OpenExistingFile(filename, flags); err != nil { - return errors.Wrapf(err, "Open existing file: %q", lf.path) - } - fstat, err := lf.fd.Stat() - if err != nil { - return errors.Wrapf(err, "Unable to check stat for %q", lf.path) - } - sz := fstat.Size() - if sz == 0 { - // File is empty. We don't need to mmap it. Return. - return nil - } - y.AssertTrue(sz <= math.MaxUint32) - lf.size = uint32(sz) - if err = lf.mmap(sz); err != nil { - _ = lf.fd.Close() - return errors.Wrapf(err, "Unable to map file: %q", fstat.Name()) + return err } return nil } @@ -1365,13 +1393,9 @@ func (vlog *valueLog) updateDiscardStats(stats map[uint32]int64) error { // flushDiscardStats inserts discard stats into badger. Returns error on failure. func (vlog *valueLog) flushDiscardStats() error { - vlog.lfDiscardStats.Lock() if len(vlog.lfDiscardStats.m) == 0 { - vlog.lfDiscardStats.Unlock() return nil } - vlog.lfDiscardStats.Unlock() - entries := []*Entry{{ Key: y.KeyWithTs(lfDiscardStatsKey, 1), Value: vlog.encodedDiscardStats(), diff --git a/vendor/github.com/dgraph-io/badger/y/checksum.go b/vendor/github.com/dgraph-io/badger/y/checksum.go deleted file mode 100644 index 6bceada4c5e..00000000000 --- a/vendor/github.com/dgraph-io/badger/y/checksum.go +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Copyright 2019 Dgraph Labs, Inc. and Contributors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package y - -import ( - "hash/crc32" - - "github.com/dgraph-io/badger/pb" - - "github.com/cespare/xxhash" - "github.com/pkg/errors" -) - -// ErrChecksumMismatch is returned at checksum mismatch. -var ErrChecksumMismatch = errors.New("checksum mismatch") - -// CalculateChecksum calculates checksum for data using ct checksum type. -func CalculateChecksum(data []byte, ct pb.Checksum_Algorithm) uint64 { - switch ct { - case pb.Checksum_CRC32C: - return uint64(crc32.Checksum(data, CastagnoliCrcTable)) - case pb.Checksum_XXHash64: - return xxhash.Sum64(data) - default: - panic("checksum type not supported") - } -} - -// VerifyChecksum validates the checksum for the data against the given expected checksum. -func VerifyChecksum(data []byte, expected *pb.Checksum) error { - actual := CalculateChecksum(data, expected.Algo) - if actual != expected.Sum { - return Wrapf(ErrChecksumMismatch, "actual: %d, expected: %d", actual, expected) - } - return nil -} diff --git a/vendor/github.com/dgraph-io/badger/y/iterator.go b/vendor/github.com/dgraph-io/badger/y/iterator.go index 13f0bb2cecd..719e8ec8ead 100644 --- a/vendor/github.com/dgraph-io/badger/y/iterator.go +++ b/vendor/github.com/dgraph-io/badger/y/iterator.go @@ -47,14 +47,14 @@ func sizeVarint(x uint64) (n int) { } // EncodedSize is the size of the ValueStruct when encoded -func (v *ValueStruct) EncodedSize() uint32 { +func (v *ValueStruct) EncodedSize() uint16 { sz := len(v.Value) + 2 // meta, usermeta. if v.ExpiresAt == 0 { - return uint32(sz + 1) + return uint16(sz + 1) } enc := sizeVarint(v.ExpiresAt) - return uint32(sz + enc) + return uint16(sz + enc) } // Decode uses the length of the slice to infer the length of the Value field. diff --git a/vendor/github.com/dgraph-io/badger/y/watermark.go b/vendor/github.com/dgraph-io/badger/y/watermark.go index d25a58c7dcb..10ca00e7e38 100644 --- a/vendor/github.com/dgraph-io/badger/y/watermark.go +++ b/vendor/github.com/dgraph-io/badger/y/watermark.go @@ -190,35 +190,17 @@ func (w *WaterMark) process(closer *Closer) { until = min loops++ } - - if until != doneUntil { - AssertTrue(atomic.CompareAndSwapUint64(&w.doneUntil, doneUntil, until)) - w.elog.Printf("%s: Done until %d. Loops: %d\n", w.Name, until, loops) - } - - notifyAndRemove := func(idx uint64, toNotify []chan struct{}) { + for i := doneUntil + 1; i <= until; i++ { + toNotify := waiters[i] for _, ch := range toNotify { close(ch) } - delete(waiters, idx) // Release the memory back. + delete(waiters, i) // Release the memory back. + } + if until != doneUntil { + AssertTrue(atomic.CompareAndSwapUint64(&w.doneUntil, doneUntil, until)) + w.elog.Printf("%s: Done until %d. Loops: %d\n", w.Name, until, loops) } - - if until-doneUntil <= uint64(len(waiters)) { - // Issue #908 showed that if doneUntil is close to 2^60, while until is zero, this loop - // can hog up CPU just iterating over integers creating a busy-wait loop. So, only do - // this path if until - doneUntil is less than the number of waiters. - for idx := doneUntil + 1; idx <= until; idx++ { - if toNotify, ok := waiters[idx]; ok { - notifyAndRemove(idx, toNotify) - } - } - } else { - for idx, toNotify := range waiters { - if idx <= until { - notifyAndRemove(idx, toNotify) - } - } - } // end of notifying waiters. } for { diff --git a/vendor/vendor.json b/vendor/vendor.json index fc96d22b549..47abc3dd359 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -429,22 +429,22 @@ "revisionTime": "2016-09-07T16:21:46Z" }, { - "checksumSHA1": "sV8+Bg5y05IDCoByoAm5X9eMb/4=", + "checksumSHA1": "Nn0ssufNYt0MsdYOBrw4BScRPh0=", "path": "github.com/dgraph-io/badger", - "revision": "50c90edf28f20390d9f72aeac28b67746932f3a6", - "revisionTime": "2019-07-18T18:21:13Z" + "revision": "efb9d9d15d7f7baa656e04933058f006c33a8d0f", + "revisionTime": "2019-10-24T17:21:50Z" }, { - "checksumSHA1": "On87ObyZ4RLZWWmDtW2M5gY997I=", + "checksumSHA1": "oOuT7ebEiZ1ViHLKdFxKFOvobAQ=", "path": "github.com/dgraph-io/badger/options", - "revision": "50c90edf28f20390d9f72aeac28b67746932f3a6", - "revisionTime": "2019-07-18T18:21:13Z" + "revision": "efb9d9d15d7f7baa656e04933058f006c33a8d0f", + "revisionTime": "2019-10-24T17:21:50Z" }, { - "checksumSHA1": "bKXWdNArXOe0NO6UFEnvVhX2YAA=", + "checksumSHA1": "SV7o4+eEK7/XNWC7H7Z5vWCoHP0=", "path": "github.com/dgraph-io/badger/pb", - "revision": "50c90edf28f20390d9f72aeac28b67746932f3a6", - "revisionTime": "2019-07-18T18:21:13Z" + "revision": "efb9d9d15d7f7baa656e04933058f006c33a8d0f", + "revisionTime": "2019-10-24T17:21:50Z" }, { "checksumSHA1": "d8wE18ae6lOhmJqh0jwwhmQCkII=", @@ -453,16 +453,16 @@ "revisionTime": "2018-11-26T21:07:12Z" }, { - "checksumSHA1": "zWLyKFwJcRNWzpvE+QDrsqi9UlE=", + "checksumSHA1": "7dxXjygrynDxvPE1UBHFDxVn7kE=", "path": "github.com/dgraph-io/badger/skl", - "revision": "50c90edf28f20390d9f72aeac28b67746932f3a6", - "revisionTime": "2019-07-18T18:21:13Z" + "revision": "efb9d9d15d7f7baa656e04933058f006c33a8d0f", + "revisionTime": "2019-10-24T17:21:50Z" }, { - "checksumSHA1": "9cwKWtAlTj4V/qSpYOwyvL1VkgU=", + "checksumSHA1": "0nZcXky/WdTWm3io4j6z2XbH4oM=", "path": "github.com/dgraph-io/badger/table", - "revision": "50c90edf28f20390d9f72aeac28b67746932f3a6", - "revisionTime": "2019-07-18T18:21:13Z" + "revision": "efb9d9d15d7f7baa656e04933058f006c33a8d0f", + "revisionTime": "2019-10-24T17:21:50Z" }, { "checksumSHA1": "JxUxrgSrNNTqbX3tqzh3dQIG+uU=", @@ -471,10 +471,10 @@ "revisionTime": "2019-07-18T18:21:13Z" }, { - "checksumSHA1": "HxjNg9sBt3V1Fcmtau1IFZL2OcI=", + "checksumSHA1": "6Ya7Kr114SVcXRqa3UaglreBrY8=", "path": "github.com/dgraph-io/badger/y", - "revision": "50c90edf28f20390d9f72aeac28b67746932f3a6", - "revisionTime": "2019-07-18T18:21:13Z" + "revision": "efb9d9d15d7f7baa656e04933058f006c33a8d0f", + "revisionTime": "2019-10-24T17:21:50Z" }, { "checksumSHA1": "b0SSBiN/qXocMxGVb4kBfetDrYg=", @@ -591,10 +591,10 @@ "revisionTime": "2016-01-25T20:49:56Z" }, { - "checksumSHA1": "C8nYObwbo2oyODQbIT83lY37ajM=", + "checksumSHA1": "Q3FteGbNvRRUMJqbYbmrcBd2DMo=", "path": "github.com/golang/protobuf/proto", - "revision": "1680a479a2cfb3fa22b972af7e36d0a0fde47bf8", - "revisionTime": "2019-09-20T23:43:18Z" + "revision": "ed6926b37a637426117ccab59282c3839528a700", + "revisionTime": "2019-10-22T19:55:53Z" }, { "checksumSHA1": "z4copNgeTN77OymdDKqLaIK/vSI=",