diff --git a/appveyor.yml b/appveyor.yml index 36853e9d1..ac3a9505c 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -17,10 +17,11 @@ environment: # scripts that run after cloning repository install: - - set PATH=%GOPATH%\bin;c:\go\bin;%PATH% + - set PATH=%GOPATH%\bin;c:\go\bin;c:\msys64\mingw64\bin;%PATH% - go version - go env - python --version + - gcc --version # To run your custom scripts instead of automatic MSBuild build_script: diff --git a/db.go b/db.go index 493edf0a6..39b12dd98 100644 --- a/db.go +++ b/db.go @@ -891,7 +891,7 @@ type flushTask struct { // handleFlushTask must be run serially. func (db *DB) handleFlushTask(ft flushTask) error { - // There can be a scnerio, when empty memtable is flushed. For example, memtable is empty and + // There can be a scenario, when empty memtable is flushed. For example, memtable is empty and // after writing request to value log, rotation count exceeds db.LogRotatesToFlush. if ft.mt.Empty() { return nil @@ -911,16 +911,13 @@ func (db *DB) handleFlushTask(ft flushTask) error { if err != nil { return y.Wrapf(err, "failed to get datakey in db.handleFlushTask") } - bopts := table.Options{ - BlockSize: db.opt.BlockSize, - BloomFalsePositive: db.opt.BloomFalsePositive, - DataKey: dk, - } + bopts := buildTableOptions(db.opt) + bopts.DataKey = dk tableData := buildL0Table(ft, bopts) fileID := db.lc.reserveFileID() if db.opt.KeepL0InMemory { - tbl, err := table.OpenInMemoryTable(tableData, fileID, dk) + tbl, err := table.OpenInMemoryTable(tableData, fileID, &bopts) if err != nil { return errors.Wrapf(err, "failed to open table in memory") } @@ -945,13 +942,7 @@ func (db *DB) handleFlushTask(ft flushTask) error { // Do dir sync as best effort. No need to return due to an error there. db.elog.Errorf("ERROR while syncing level directory: %v", dirSyncErr) } - - opts := table.Options{ - LoadingMode: db.opt.TableLoadingMode, - ChkMode: db.opt.ChecksumVerificationMode, - DataKey: dk, - } - tbl, err := table.OpenTable(fd, opts) + tbl, err := table.OpenTable(fd, bopts) if err != nil { db.elog.Printf("ERROR while opening table: %v", err) return err diff --git a/db2_test.go b/db2_test.go index 5b19aba03..b9ad5abbd 100644 --- a/db2_test.go +++ b/db2_test.go @@ -505,9 +505,10 @@ func TestCompactionFilePicking(t *testing.T) { // addToManifest function is used in TestCompactionFilePicking. It adds table to db manifest. func addToManifest(t *testing.T, db *DB, tab *table.Table, level uint32) { change := &pb.ManifestChange{ - Id: tab.ID(), - Op: pb.ManifestChange_CREATE, - Level: level, + Id: tab.ID(), + Op: pb.ManifestChange_CREATE, + Level: level, + Compression: uint32(tab.CompressionType()), } require.NoError(t, db.manifest.addChanges([]*pb.ManifestChange{change}), "unable to add to manifest") @@ -516,10 +517,7 @@ func addToManifest(t *testing.T, db *DB, tab *table.Table, level uint32) { // createTableWithRange function is used in TestCompactionFilePicking. It creates // a table with key starting from start and ending with end. func createTableWithRange(t *testing.T, db *DB, start, end int) *table.Table { - bopts := table.Options{ - BlockSize: db.opt.BlockSize, - BloomFalsePositive: db.opt.BloomFalsePositive, - } + bopts := buildTableOptions(db.opt) b := table.NewTableBuilder(bopts) nums := []int{start, end} for _, i := range nums { @@ -537,8 +535,7 @@ func createTableWithRange(t *testing.T, db *DB, start, end int) *table.Table { _, err = fd.Write(b.Finish()) require.NoError(t, err, "unable to write to file") - opts := table.Options{LoadingMode: options.LoadToRAM, ChkMode: options.NoVerification} - tab, err := table.OpenTable(fd, opts) + tab, err := table.OpenTable(fd, bopts) require.NoError(t, err) return tab } diff --git a/db_test.go b/db_test.go index c51dec4dd..c4d534499 100644 --- a/db_test.go +++ b/db_test.go @@ -588,7 +588,7 @@ func TestIterate2Basic(t *testing.T) { }) } -func TestLoadAndEncryption(t *testing.T) { +func TestLoad(t *testing.T) { testLoad := func(t *testing.T, opt Options) { dir, err := ioutil.TempDir("", "badger-test") require.NoError(t, err) @@ -642,14 +642,33 @@ func TestLoadAndEncryption(t *testing.T) { sort.Slice(fileIDs, func(i, j int) bool { return fileIDs[i] < fileIDs[j] }) fmt.Printf("FileIDs: %v\n", fileIDs) } - t.Run("TestLoad Without Encryption", func(t *testing.T) { - testLoad(t, getTestOptions("")) + t.Run("TestLoad Without Encryption/Compression", func(t *testing.T) { + opt := getTestOptions("") + opt.Compression = options.None + testLoad(t, opt) }) - t.Run("TestLoad With Encryption", func(t *testing.T) { + t.Run("TestLoad With Encryption and no compression", func(t *testing.T) { key := make([]byte, 32) _, err := rand.Read(key) require.NoError(t, err) - testLoad(t, getTestOptions("").WithEncryptionKey(key)) + opt := getTestOptions("") + opt.EncryptionKey = key + opt.Compression = options.None + testLoad(t, opt) + }) + t.Run("TestLoad With Encryption and compression", func(t *testing.T) { + key := make([]byte, 32) + _, err := rand.Read(key) + require.NoError(t, err) + opt := getTestOptions("") + opt.EncryptionKey = key + opt.Compression = options.ZSTD + testLoad(t, opt) + }) + t.Run("TestLoad without Encryption and with compression", func(t *testing.T) { + opt := getTestOptions("") + opt.Compression = options.ZSTD + testLoad(t, opt) }) } diff --git a/go.mod b/go.mod index ed247de91..c52fb3ace 100644 --- a/go.mod +++ b/go.mod @@ -3,12 +3,14 @@ module github.com/dgraph-io/badger go 1.12 require ( + github.com/DataDog/zstd v1.4.1 github.com/cespare/xxhash v1.1.0 github.com/cespare/xxhash/v2 v2.1.0 // indirect github.com/dgraph-io/ristretto v0.0.0-20190916120426-cd2835491e0e github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 github.com/dustin/go-humanize v1.0.0 github.com/golang/protobuf v1.3.1 + github.com/golang/snappy v0.0.1 github.com/pkg/errors v0.8.1 github.com/spaolacci/murmur3 v1.1.0 // indirect github.com/spf13/cobra v0.0.5 diff --git a/go.sum b/go.sum index 8f75b6eca..4a485aafa 100644 --- a/go.sum +++ b/go.sum @@ -1,4 +1,6 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/DataDog/zstd v1.4.1 h1:3oxKN3wbHibqx897utPC2LTQU4J+IHWWJO+glkAkpFM= +github.com/DataDog/zstd v1.4.1/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/OneOfOne/xxhash v1.2.5/go.mod h1:eZbhyaAYD41SGSSsnmcpxVoRiQ/MPUTjUdIIOT9Um7Q= github.com/VictoriaMetrics/fastcache v1.5.1/go.mod h1:+jv9Ckb+za/P1ZRg/sulP5Ni1v49daAVERr0H3CuscE= @@ -31,6 +33,7 @@ github.com/goburrow/cache v0.1.0/go.mod h1:8oxkfud4hvjO4tNjEKZfEd+LrpDVDlBIauGYs github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= diff --git a/iterator_test.go b/iterator_test.go index 9c4461ecc..548b6242c 100644 --- a/iterator_test.go +++ b/iterator_test.go @@ -79,9 +79,9 @@ func TestPickSortTables(t *testing.T) { genTables := func(mks ...MockKeys) []*table.Table { out := make([]*table.Table, 0) for _, mk := range mks { - f := buildTable(t, [][]string{{mk.small, "some value"}, {mk.large, "some value"}}) opts := table.Options{LoadingMode: options.MemoryMap, ChkMode: options.OnTableAndBlockRead} + f := buildTable(t, [][]string{{mk.small, "some value"}, {mk.large, "some value"}}, opts) tbl, err := table.OpenTable(f, opts) require.NoError(t, err) out = append(out, tbl) diff --git a/levels.go b/levels.go index ed2144f34..e5083f562 100644 --- a/levels.go +++ b/levels.go @@ -154,12 +154,11 @@ func newLevelsController(db *DB, mf *Manifest) (*levelsController, error) { rerr = errors.Wrapf(err, "Error while reading datakey") return } - opts := table.Options{ - LoadingMode: db.opt.TableLoadingMode, - ChkMode: db.opt.ChecksumVerificationMode, - DataKey: dk, - } - t, err := table.OpenTable(fd, opts) + topt := buildTableOptions(db.opt) + // Set compression from table manifest. + topt.Compression = tf.Compression + topt.DataKey = dk + t, err := table.OpenTable(fd, topt) if err != nil { if strings.HasPrefix(err.Error(), "CHECKSUM_MISMATCH:") { db.opt.Errorf(err.Error()) @@ -508,11 +507,8 @@ func (s *levelsController) compactBuildTables( return nil, nil, y.Wrapf(err, "Error while retrieving datakey in levelsController.compactBuildTables") } - bopts := table.Options{ - BlockSize: s.kv.opt.BlockSize, - BloomFalsePositive: s.kv.opt.BloomFalsePositive, - DataKey: dk, - } + bopts := buildTableOptions(s.kv.opt) + bopts.DataKey = dk builder := table.NewTableBuilder(bopts) var numKeys, numSkips uint64 for ; it.Valid(); it.Next() { @@ -593,13 +589,7 @@ func (s *levelsController) compactBuildTables( if _, err := fd.Write(builder.Finish()); err != nil { return nil, errors.Wrapf(err, "Unable to write to file: %d", fileID) } - - opts := table.Options{ - LoadingMode: s.kv.opt.TableLoadingMode, - ChkMode: s.kv.opt.ChecksumVerificationMode, - DataKey: builder.DataKey(), - } - tbl, err := table.OpenTable(fd, opts) + tbl, err := table.OpenTable(fd, bopts) // decrRef is added below. return tbl, errors.Wrapf(err, "Unable to open table: %q", fd.Name()) } @@ -659,7 +649,7 @@ func buildChangeSet(cd *compactDef, newTables []*table.Table) pb.ManifestChangeS changes := []*pb.ManifestChange{} for _, table := range newTables { changes = append(changes, - newCreateChange(table.ID(), cd.nextLevel.level, table.KeyID())) + newCreateChange(table.ID(), cd.nextLevel.level, table.KeyID(), table.CompressionType())) } for _, table := range cd.top { // Add a delete change only if the table is not in memory. @@ -895,7 +885,7 @@ func (s *levelsController) addLevel0Table(t *table.Table) error { // the proper order. (That means this update happens before that of some compaction which // deletes the table.) err := s.kv.manifest.addChanges([]*pb.ManifestChange{ - newCreateChange(t.ID(), 0, t.KeyID()), + newCreateChange(t.ID(), 0, t.KeyID(), t.CompressionType()), }) if err != nil { return err diff --git a/manifest.go b/manifest.go index f54508171..f7a73294e 100644 --- a/manifest.go +++ b/manifest.go @@ -27,6 +27,7 @@ import ( "path/filepath" "sync" + "github.com/dgraph-io/badger/options" "github.com/dgraph-io/badger/pb" "github.com/dgraph-io/badger/y" "github.com/golang/protobuf/proto" @@ -65,11 +66,12 @@ type levelManifest struct { Tables map[uint64]struct{} // Set of table id's } -// TableManifest contains information about a specific level +// TableManifest contains information about a specific table // in the LSM tree. type TableManifest struct { - Level uint8 - KeyID uint64 + Level uint8 + KeyID uint64 + Compression options.CompressionType } // manifestFile holds the file pointer (and other info) about the manifest file, which is a log @@ -100,7 +102,7 @@ const ( func (m *Manifest) asChanges() []*pb.ManifestChange { changes := make([]*pb.ManifestChange, 0, len(m.Tables)) for id, tm := range m.Tables { - changes = append(changes, newCreateChange(id, int(tm.Level), tm.KeyID)) + changes = append(changes, newCreateChange(id, int(tm.Level), tm.KeyID, tm.Compression)) } return changes } @@ -395,8 +397,9 @@ func applyManifestChange(build *Manifest, tc *pb.ManifestChange) error { return fmt.Errorf("MANIFEST invalid, table %d exists", tc.Id) } build.Tables[tc.Id] = TableManifest{ - Level: uint8(tc.Level), - KeyID: tc.KeyId, + Level: uint8(tc.Level), + KeyID: tc.KeyId, + Compression: options.CompressionType(tc.Compression), } for len(build.Levels) <= int(tc.Level) { build.Levels = append(build.Levels, levelManifest{make(map[uint64]struct{})}) @@ -428,14 +431,16 @@ func applyChangeSet(build *Manifest, changeSet *pb.ManifestChangeSet) error { return nil } -func newCreateChange(id uint64, level int, keyID uint64) *pb.ManifestChange { +func newCreateChange( + id uint64, level int, keyID uint64, c options.CompressionType) *pb.ManifestChange { return &pb.ManifestChange{ - Id: id, - Op: pb.ManifestChange_CREATE, - Level: uint32(level), - KeyId: keyID, + Id: id, + Op: pb.ManifestChange_CREATE, + Level: uint32(level), + KeyId: keyID, + // Hard coding it, since we're supporting only AES for now. EncryptionAlgo: pb.EncryptionAlgo_aes, - // Hardcoding it, since we're supporting only AES for now. + Compression: uint32(c), } } diff --git a/manifest_test.go b/manifest_test.go index 2c9eada08..a0a0c01ae 100644 --- a/manifest_test.go +++ b/manifest_test.go @@ -112,7 +112,7 @@ func key(prefix string, i int) string { return prefix + fmt.Sprintf("%04d", i) } -func buildTestTable(t *testing.T, prefix string, n int) *os.File { +func buildTestTable(t *testing.T, prefix string, n int, opts table.Options) *os.File { y.AssertTrue(n <= 10000) keyValues := make([][]string, n) for i := 0; i < n; i++ { @@ -120,13 +120,18 @@ func buildTestTable(t *testing.T, prefix string, n int) *os.File { v := fmt.Sprintf("%d", i) keyValues[i] = []string{k, v} } - return buildTable(t, keyValues) + return buildTable(t, keyValues, opts) } // TODO - Move these to somewhere where table package can also use it. // keyValues is n by 2 where n is number of pairs. -func buildTable(t *testing.T, keyValues [][]string) *os.File { - bopts := table.Options{BlockSize: 4 * 1024, BloomFalsePositive: 0.01} +func buildTable(t *testing.T, keyValues [][]string, bopts table.Options) *os.File { + if bopts.BloomFalsePositive == 0 { + bopts.BloomFalsePositive = 0.01 + } + if bopts.BlockSize == 0 { + bopts.BlockSize = 4 * 1024 + } b := table.NewTableBuilder(bopts) defer b.Close() // TODO: Add test for file garbage collection here. No files should be left after the tests here. @@ -166,8 +171,8 @@ func TestOverlappingKeyRangeError(t *testing.T) { lh0 := newLevelHandler(kv, 0) lh1 := newLevelHandler(kv, 1) - f := buildTestTable(t, "k", 2) opts := table.Options{LoadingMode: options.MemoryMap, ChkMode: options.OnTableAndBlockRead} + f := buildTestTable(t, "k", 2, opts) t1, err := table.OpenTable(f, opts) require.NoError(t, err) defer t1.DecrRef() @@ -188,7 +193,7 @@ func TestOverlappingKeyRangeError(t *testing.T) { require.Equal(t, true, done) lc.runCompactDef(0, cd) - f = buildTestTable(t, "l", 2) + f = buildTestTable(t, "l", 2, opts) t2, err := table.OpenTable(f, opts) require.NoError(t, err) defer t2.DecrRef() @@ -220,13 +225,13 @@ func TestManifestRewrite(t *testing.T) { require.Equal(t, 0, m.Deletions) err = mf.addChanges([]*pb.ManifestChange{ - newCreateChange(0, 0, 0), + newCreateChange(0, 0, 0, 0), }) require.NoError(t, err) for i := uint64(0); i < uint64(deletionsThreshold*3); i++ { ch := []*pb.ManifestChange{ - newCreateChange(i+1, 0, 0), + newCreateChange(i+1, 0, 0, 0), newDeleteChange(i), } err := mf.addChanges(ch) diff --git a/options.go b/options.go index 0068d541f..293e09f4c 100644 --- a/options.go +++ b/options.go @@ -20,6 +20,7 @@ import ( "time" "github.com/dgraph-io/badger/options" + "github.com/dgraph-io/badger/table" ) // Note: If you add a new option X make sure you also add a WithX method on Options. @@ -46,6 +47,7 @@ type Options struct { ReadOnly bool Truncate bool Logger Logger + Compression options.CompressionType EventLogging bool // Fine tuning options. @@ -112,6 +114,7 @@ func DefaultOptions(path string) Options { NumVersionsToKeep: 1, CompactL0OnClose: true, KeepL0InMemory: true, + Compression: options.ZSTD, // Nothing to read/write value log using standard File I/O // MemoryMap to mmap() the value log files // (2^30 - 1)*2 when mmapping < 2^31 - 1, max int32. @@ -129,6 +132,16 @@ func DefaultOptions(path string) Options { } } +func buildTableOptions(opt Options) table.Options { + return table.Options{ + BlockSize: opt.BlockSize, + BloomFalsePositive: opt.BloomFalsePositive, + LoadingMode: opt.TableLoadingMode, + ChkMode: opt.ChecksumVerificationMode, + Compression: opt.Compression, + } +} + const ( maxValueThreshold = (1 << 20) // 1 MB ) @@ -461,3 +474,12 @@ func (opt Options) WithKeepL0InMemory(val bool) Options { opt.KeepL0InMemory = val return opt } + +// WithCompressionType returns a new Options value with CompressionType set to the given value. +// +// When compression type is set, every block will be compressed using the specified algorithm. +// This option doesn't affect existing tables. Only the newly created tables will be compressed. +func (opt Options) WithCompressionType(cType options.CompressionType) Options { + opt.Compression = cType + return opt +} diff --git a/options/options.go b/options/options.go index f73553ab5..564f780f1 100644 --- a/options/options.go +++ b/options/options.go @@ -43,3 +43,15 @@ const ( // on SSTable opening and on every block read. OnTableAndBlockRead ) + +// CompressionType specifies how a block should be compressed. +type CompressionType uint32 + +const ( + // None mode indicates that a block is not compressed. + None CompressionType = 0 + // Snappy mode indicates that a block is compressed using Snappy algorithm. + Snappy CompressionType = 1 + // ZSTD mode indicates that a block is compressed using ZSTD algorithm. + ZSTD CompressionType = 2 +) diff --git a/pb/pb.pb.go b/pb/pb.pb.go index 7a989a6c5..bfff39b7b 100644 --- a/pb/pb.pb.go +++ b/pb/pb.pb.go @@ -285,11 +285,12 @@ func (m *ManifestChangeSet) GetChanges() []*ManifestChange { } type ManifestChange struct { - Id uint64 `protobuf:"varint,1,opt,name=Id,json=id,proto3" json:"Id,omitempty"` - Op ManifestChange_Operation `protobuf:"varint,2,opt,name=Op,json=op,proto3,enum=pb.ManifestChange_Operation" json:"Op,omitempty"` - Level uint32 `protobuf:"varint,3,opt,name=Level,json=level,proto3" json:"Level,omitempty"` + Id uint64 `protobuf:"varint,1,opt,name=Id,proto3" json:"Id,omitempty"` + Op ManifestChange_Operation `protobuf:"varint,2,opt,name=Op,proto3,enum=pb.ManifestChange_Operation" json:"Op,omitempty"` + Level uint32 `protobuf:"varint,3,opt,name=Level,proto3" json:"Level,omitempty"` KeyId uint64 `protobuf:"varint,4,opt,name=key_id,json=keyId,proto3" json:"key_id,omitempty"` EncryptionAlgo EncryptionAlgo `protobuf:"varint,5,opt,name=encryption_algo,json=encryptionAlgo,proto3,enum=pb.EncryptionAlgo" json:"encryption_algo,omitempty"` + Compression uint32 `protobuf:"varint,6,opt,name=compression,proto3" json:"compression,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -363,6 +364,13 @@ func (m *ManifestChange) GetEncryptionAlgo() EncryptionAlgo { return EncryptionAlgo_aes } +func (m *ManifestChange) GetCompression() uint32 { + if m != nil { + return m.Compression + } + return 0 +} + type BlockOffset struct { Key []byte `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"` Offset uint32 `protobuf:"varint,2,opt,name=offset,proto3" json:"offset,omitempty"` @@ -624,45 +632,46 @@ func init() { func init() { proto.RegisterFile("pb.proto", fileDescriptor_f80abaa17e25ccc8) } var fileDescriptor_f80abaa17e25ccc8 = []byte{ - // 599 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x53, 0xcd, 0x6e, 0xd3, 0x40, - 0x10, 0xce, 0x3a, 0x8e, 0x93, 0x4c, 0xdb, 0xd4, 0xac, 0xa0, 0x32, 0x02, 0xa2, 0x60, 0x84, 0x14, - 0xaa, 0x2a, 0x87, 0x16, 0x71, 0xe1, 0x94, 0xa6, 0x41, 0x44, 0x69, 0x55, 0x69, 0xa9, 0xaa, 0x8a, - 0x4b, 0xb4, 0x89, 0x27, 0x8d, 0x65, 0xc7, 0x6b, 0x79, 0x37, 0x56, 0xf3, 0x26, 0xbc, 0x07, 0x2f, - 0xc1, 0x91, 0x17, 0x40, 0x42, 0xe5, 0x45, 0xd0, 0xae, 0x9d, 0xaa, 0x11, 0xdc, 0x66, 0xbe, 0x6f, - 0x76, 0x7e, 0xbe, 0x99, 0x85, 0x46, 0x3a, 0xed, 0xa5, 0x99, 0x50, 0x82, 0x5a, 0xe9, 0xd4, 0xff, - 0x4e, 0xc0, 0x1a, 0x5f, 0x53, 0x17, 0xaa, 0x11, 0xae, 0x3d, 0xd2, 0x21, 0xdd, 0x5d, 0xa6, 0x4d, - 0xfa, 0x14, 0x6a, 0x39, 0x8f, 0x57, 0xe8, 0x59, 0x06, 0x2b, 0x1c, 0xfa, 0x02, 0x9a, 0x2b, 0x89, - 0xd9, 0x64, 0x89, 0x8a, 0x7b, 0x55, 0xc3, 0x34, 0x34, 0x70, 0x81, 0x8a, 0x53, 0x0f, 0xea, 0x39, - 0x66, 0x32, 0x14, 0x89, 0x67, 0x77, 0x48, 0xd7, 0x66, 0x1b, 0x97, 0xbe, 0x02, 0xc0, 0xbb, 0x34, - 0xcc, 0x50, 0x4e, 0xb8, 0xf2, 0x6a, 0x86, 0x6c, 0x96, 0x48, 0x5f, 0x51, 0x0a, 0xb6, 0x49, 0xe8, - 0x98, 0x84, 0xc6, 0xd6, 0x95, 0xa4, 0xca, 0x90, 0x2f, 0x27, 0x61, 0xe0, 0x41, 0x87, 0x74, 0xf7, - 0x58, 0xa3, 0x00, 0x46, 0x81, 0xdf, 0x01, 0x67, 0x7c, 0x7d, 0x1e, 0x4a, 0x45, 0x0f, 0xc0, 0x8a, - 0x72, 0x8f, 0x74, 0xaa, 0xdd, 0x9d, 0x63, 0xa7, 0x97, 0x4e, 0x7b, 0xe3, 0x6b, 0x66, 0x45, 0xb9, - 0xdf, 0x87, 0x27, 0x17, 0x3c, 0x09, 0xe7, 0x28, 0xd5, 0x60, 0xc1, 0x93, 0x5b, 0xfc, 0x82, 0x8a, - 0x1e, 0x41, 0x7d, 0x66, 0x1c, 0x59, 0xbe, 0xa0, 0xfa, 0xc5, 0x76, 0x1c, 0xdb, 0x84, 0xf8, 0xbf, - 0x08, 0xb4, 0xb6, 0x39, 0xda, 0x02, 0x6b, 0x14, 0x18, 0x95, 0x6c, 0x66, 0x85, 0x01, 0x3d, 0x02, - 0xeb, 0x32, 0x35, 0x0a, 0xb5, 0x8e, 0x5f, 0xfe, 0x9b, 0xab, 0x77, 0x99, 0x62, 0xc6, 0x55, 0x28, - 0x12, 0x66, 0x89, 0x54, 0x4b, 0x7a, 0x8e, 0x39, 0xc6, 0x46, 0xb8, 0x3d, 0x56, 0x8b, 0xb5, 0x43, - 0x9f, 0x81, 0x13, 0xe1, 0x5a, 0x4f, 0x59, 0x88, 0x56, 0x8b, 0x70, 0x3d, 0x0a, 0xe8, 0x47, 0xd8, - 0xc7, 0x64, 0x96, 0xad, 0x53, 0xfd, 0x7c, 0xc2, 0xe3, 0x5b, 0x61, 0x74, 0x6b, 0x15, 0x3d, 0x0f, - 0x1f, 0xa8, 0x7e, 0x7c, 0x2b, 0x58, 0x0b, 0xb7, 0x7c, 0xff, 0x0d, 0x34, 0x1f, 0x4a, 0x53, 0x00, - 0x67, 0xc0, 0x86, 0xfd, 0xab, 0xa1, 0x5b, 0xd1, 0xf6, 0xd9, 0xf0, 0x7c, 0x78, 0x35, 0x74, 0x89, - 0x3f, 0x82, 0x9d, 0xd3, 0x58, 0xcc, 0xa2, 0xcb, 0xf9, 0x5c, 0xa2, 0xfa, 0xcf, 0x09, 0x1c, 0x80, - 0x23, 0x0c, 0x67, 0x26, 0xdc, 0x63, 0xa5, 0xa7, 0x23, 0x63, 0x4c, 0xca, 0x29, 0xb4, 0xe9, 0x7f, - 0x05, 0xb8, 0xe2, 0xd3, 0x18, 0x47, 0x49, 0x80, 0x77, 0xf4, 0x1d, 0xd4, 0x8b, 0xc8, 0x8d, 0xcc, - 0xfb, 0xba, 0xe5, 0x47, 0xb5, 0xd8, 0x86, 0xa7, 0xaf, 0x61, 0x77, 0x1a, 0x0b, 0xb1, 0x9c, 0xcc, - 0xc3, 0x58, 0x61, 0x56, 0x1e, 0xdb, 0x8e, 0xc1, 0x3e, 0x19, 0xc8, 0x17, 0xd0, 0x18, 0x2c, 0x70, - 0x16, 0xc9, 0xd5, 0x92, 0x1e, 0x82, 0x6d, 0x94, 0x20, 0x46, 0x89, 0x03, 0x9d, 0x76, 0xc3, 0xf5, - 0xf4, 0xe0, 0x59, 0xa8, 0x16, 0x4b, 0x66, 0x62, 0x74, 0x97, 0x72, 0xb5, 0x34, 0x19, 0x6d, 0xa6, - 0x4d, 0xff, 0x2d, 0x34, 0x1f, 0x82, 0x0a, 0x55, 0x06, 0x27, 0xc7, 0x03, 0xb7, 0x42, 0x77, 0xa1, - 0x71, 0x73, 0xf3, 0x99, 0xcb, 0xc5, 0x87, 0xf7, 0x2e, 0xf1, 0x67, 0x50, 0x3f, 0xe3, 0x8a, 0x8f, - 0x71, 0xfd, 0x68, 0x37, 0xe4, 0xf1, 0x6e, 0x28, 0xd8, 0x01, 0x57, 0xbc, 0xec, 0xd6, 0xd8, 0xfa, - 0x34, 0xc2, 0xbc, 0xfc, 0x12, 0x56, 0x98, 0xeb, 0x93, 0x9f, 0x65, 0xc8, 0x15, 0x06, 0xfa, 0xe4, - 0xf5, 0x6a, 0xab, 0xac, 0x59, 0x22, 0x7d, 0x75, 0xf8, 0x1c, 0x5a, 0xdb, 0x3b, 0xa4, 0x75, 0xa8, - 0x72, 0x94, 0x6e, 0xe5, 0xd4, 0xfd, 0x71, 0xdf, 0x26, 0x3f, 0xef, 0xdb, 0xe4, 0xf7, 0x7d, 0x9b, - 0x7c, 0xfb, 0xd3, 0xae, 0x4c, 0x1d, 0xf3, 0x5f, 0x4f, 0xfe, 0x06, 0x00, 0x00, 0xff, 0xff, 0xf2, - 0x7a, 0x56, 0x42, 0xbb, 0x03, 0x00, 0x00, + // 611 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x53, 0xdd, 0x6e, 0x12, 0x41, + 0x14, 0x66, 0x16, 0xba, 0xc0, 0xa1, 0xa5, 0xeb, 0x44, 0x9b, 0x35, 0x2a, 0xc1, 0x35, 0x26, 0xd8, + 0x34, 0x5c, 0xb4, 0xc6, 0x1b, 0xaf, 0x28, 0xc5, 0x48, 0x68, 0x43, 0x32, 0x36, 0x4d, 0xe3, 0x0d, + 0x19, 0x76, 0x0f, 0x65, 0xb3, 0xbf, 0xd9, 0x19, 0x36, 0xe5, 0x4d, 0x7c, 0x0f, 0x5f, 0xc2, 0x4b, + 0x1f, 0xc1, 0xd4, 0x07, 0xd1, 0xcc, 0xec, 0xd2, 0x40, 0xf4, 0xee, 0x9c, 0xef, 0x3b, 0x73, 0xe6, + 0xcc, 0xf7, 0xcd, 0x81, 0x46, 0x3a, 0xef, 0xa7, 0x59, 0x22, 0x13, 0x6a, 0xa4, 0x73, 0xe7, 0x3b, + 0x01, 0x63, 0x72, 0x43, 0x2d, 0xa8, 0x06, 0xb8, 0xb6, 0x49, 0x97, 0xf4, 0xf6, 0x99, 0x0a, 0xe9, + 0x53, 0xd8, 0xcb, 0x79, 0xb8, 0x42, 0xdb, 0xd0, 0x58, 0x91, 0xd0, 0x17, 0xd0, 0x5c, 0x09, 0xcc, + 0x66, 0x11, 0x4a, 0x6e, 0x57, 0x35, 0xd3, 0x50, 0xc0, 0x15, 0x4a, 0x4e, 0x6d, 0xa8, 0xe7, 0x98, + 0x09, 0x3f, 0x89, 0xed, 0x5a, 0x97, 0xf4, 0x6a, 0x6c, 0x93, 0xd2, 0x57, 0x00, 0x78, 0x9f, 0xfa, + 0x19, 0x8a, 0x19, 0x97, 0xf6, 0x9e, 0x26, 0x9b, 0x25, 0x32, 0x90, 0x94, 0x42, 0x4d, 0x37, 0x34, + 0x75, 0x43, 0x1d, 0xab, 0x9b, 0x84, 0xcc, 0x90, 0x47, 0x33, 0xdf, 0xb3, 0xa1, 0x4b, 0x7a, 0x07, + 0xac, 0x51, 0x00, 0x63, 0xcf, 0xe9, 0x82, 0x39, 0xb9, 0xb9, 0xf4, 0x85, 0xa4, 0x47, 0x60, 0x04, + 0xb9, 0x4d, 0xba, 0xd5, 0x5e, 0xeb, 0xd4, 0xec, 0xa7, 0xf3, 0xfe, 0xe4, 0x86, 0x19, 0x41, 0xee, + 0x0c, 0xe0, 0xc9, 0x15, 0x8f, 0xfd, 0x05, 0x0a, 0x39, 0x5c, 0xf2, 0xf8, 0x0e, 0xbf, 0xa0, 0xa4, + 0x27, 0x50, 0x77, 0x75, 0x22, 0xca, 0x13, 0x54, 0x9d, 0xd8, 0xad, 0x63, 0x9b, 0x12, 0xe7, 0x0f, + 0x81, 0xf6, 0x2e, 0x47, 0xdb, 0x60, 0x8c, 0x3d, 0xad, 0x52, 0x8d, 0x19, 0x63, 0x8f, 0x9e, 0x80, + 0x31, 0x4d, 0xb5, 0x42, 0xed, 0xd3, 0x97, 0xff, 0xf6, 0xea, 0x4f, 0x53, 0xcc, 0xb8, 0xf4, 0x93, + 0x98, 0x19, 0xd3, 0x54, 0x49, 0x7a, 0x89, 0x39, 0x86, 0x5a, 0xb8, 0x03, 0x56, 0x24, 0xf4, 0x19, + 0x98, 0x01, 0xae, 0xd5, 0x2b, 0x0b, 0xd1, 0xf6, 0x02, 0x5c, 0x8f, 0x3d, 0xfa, 0x11, 0x0e, 0x31, + 0x76, 0xb3, 0x75, 0xaa, 0x8e, 0xcf, 0x78, 0x78, 0x97, 0x68, 0xdd, 0xda, 0xc5, 0xcc, 0xa3, 0x47, + 0x6a, 0x10, 0xde, 0x25, 0xac, 0x8d, 0x3b, 0x39, 0xed, 0x42, 0xcb, 0x4d, 0xa2, 0x34, 0x43, 0xa1, + 0xdd, 0x30, 0xf5, 0x7d, 0xdb, 0x90, 0xf3, 0x06, 0x9a, 0x8f, 0xc3, 0x51, 0x00, 0x73, 0xc8, 0x46, + 0x83, 0xeb, 0x91, 0x55, 0x51, 0xf1, 0xc5, 0xe8, 0x72, 0x74, 0x3d, 0xb2, 0x88, 0x33, 0x86, 0xd6, + 0x79, 0x98, 0xb8, 0xc1, 0x74, 0xb1, 0x10, 0x28, 0xff, 0xf3, 0x49, 0x8e, 0xc0, 0x4c, 0x34, 0xa7, + 0x35, 0x38, 0x60, 0x65, 0xa6, 0x2a, 0x43, 0x8c, 0xcb, 0x77, 0xaa, 0xd0, 0xf9, 0x0a, 0x70, 0xcd, + 0xe7, 0x21, 0x8e, 0x63, 0x0f, 0xef, 0xe9, 0x3b, 0xa8, 0x17, 0x95, 0x1b, 0x23, 0x0e, 0xd5, 0xa3, + 0xb6, 0xee, 0x62, 0x1b, 0x9e, 0xbe, 0x86, 0xfd, 0x79, 0x98, 0x24, 0xd1, 0x6c, 0xe1, 0x87, 0x12, + 0xb3, 0xf2, 0x3b, 0xb6, 0x34, 0xf6, 0x49, 0x43, 0x4e, 0x02, 0x8d, 0xe1, 0x12, 0xdd, 0x40, 0xac, + 0x22, 0x7a, 0x0c, 0x35, 0xad, 0x15, 0xd1, 0x5a, 0x1d, 0xa9, 0xb6, 0x1b, 0xae, 0xaf, 0xa4, 0xc9, + 0x7c, 0xb9, 0x8c, 0x98, 0xae, 0x51, 0x53, 0x8a, 0x55, 0xa4, 0x3b, 0xd6, 0x98, 0x0a, 0x9d, 0xb7, + 0xd0, 0x7c, 0x2c, 0x2a, 0x54, 0x19, 0x9e, 0x9d, 0x0e, 0xad, 0x0a, 0xdd, 0x87, 0xc6, 0xed, 0xed, + 0x67, 0x2e, 0x96, 0x1f, 0xde, 0x5b, 0xc4, 0x71, 0xa1, 0x7e, 0xc1, 0x25, 0x9f, 0xe0, 0x7a, 0xcb, + 0x3d, 0xb2, 0xed, 0x1e, 0x85, 0x9a, 0xc7, 0x25, 0x2f, 0xa7, 0xd5, 0xb1, 0xfa, 0x3c, 0x7e, 0x5e, + 0x2e, 0x8d, 0xe1, 0xe7, 0x6a, 0x29, 0xdc, 0x0c, 0xb9, 0x44, 0x4f, 0x2d, 0x85, 0x32, 0xbf, 0xca, + 0x9a, 0x25, 0x32, 0x90, 0xc7, 0xcf, 0xa1, 0xbd, 0xeb, 0x32, 0xad, 0x43, 0x95, 0xa3, 0xb0, 0x2a, + 0xe7, 0xd6, 0x8f, 0x87, 0x0e, 0xf9, 0xf9, 0xd0, 0x21, 0xbf, 0x1e, 0x3a, 0xe4, 0xdb, 0xef, 0x4e, + 0x65, 0x6e, 0xea, 0x8d, 0x3e, 0xfb, 0x1b, 0x00, 0x00, 0xff, 0xff, 0x1d, 0x2f, 0x21, 0x79, 0xdd, + 0x03, 0x00, 0x00, } func (m *KV) Marshal() (dAtA []byte, err error) { @@ -831,6 +840,11 @@ func (m *ManifestChange) MarshalTo(dAtA []byte) (int, error) { i++ i = encodeVarintPb(dAtA, i, uint64(m.EncryptionAlgo)) } + if m.Compression != 0 { + dAtA[i] = 0x30 + i++ + i = encodeVarintPb(dAtA, i, uint64(m.Compression)) + } if m.XXX_unrecognized != nil { i += copy(dAtA[i:], m.XXX_unrecognized) } @@ -1090,6 +1104,9 @@ func (m *ManifestChange) Size() (n int) { if m.EncryptionAlgo != 0 { n += 1 + sovPb(uint64(m.EncryptionAlgo)) } + if m.Compression != 0 { + n += 1 + sovPb(uint64(m.Compression)) + } if m.XXX_unrecognized != nil { n += len(m.XXX_unrecognized) } @@ -1744,6 +1761,25 @@ func (m *ManifestChange) Unmarshal(dAtA []byte) error { break } } + case 6: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Compression", wireType) + } + m.Compression = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowPb + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Compression |= uint32(b&0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := skipPb(dAtA[iNdEx:]) diff --git a/pb/pb.proto b/pb/pb.proto index 1c8d9c587..1edb0edc4 100644 --- a/pb/pb.proto +++ b/pb/pb.proto @@ -36,8 +36,8 @@ message KVList { } message ManifestChangeSet { - // A set of changes that are applied atomically. - repeated ManifestChange changes = 1; + // A set of changes that are applied atomically. + repeated ManifestChange changes = 1; } enum EncryptionAlgo { @@ -45,15 +45,16 @@ enum EncryptionAlgo { } message ManifestChange { - uint64 Id = 1; - enum Operation { - CREATE = 0; - DELETE = 1; - } - Operation Op = 2; - uint32 Level = 3; // Only used for CREATE - uint64 key_id = 4; - EncryptionAlgo encryption_algo = 5; + uint64 Id = 1; // Table ID. + enum Operation { + CREATE = 0; + DELETE = 1; + } + Operation Op = 2; + uint32 Level = 3; // Only used for CREATE. + uint64 key_id = 4; + EncryptionAlgo encryption_algo = 5; + uint32 compression = 6; // Only used for CREATE Op. } message BlockOffset { diff --git a/stream_writer.go b/stream_writer.go index f239e3f8b..b3ab8f141 100644 --- a/stream_writer.go +++ b/stream_writer.go @@ -219,11 +219,8 @@ func (sw *StreamWriter) newWriter(streamID uint32) (*sortedWriter, error) { return nil, err } - bopts := table.Options{ - BlockSize: sw.db.opt.BlockSize, - BloomFalsePositive: sw.db.opt.BloomFalsePositive, - DataKey: dk, - } + bopts := buildTableOptions(sw.db.opt) + bopts.DataKey = dk w := &sortedWriter{ db: sw.db, streamID: streamID, @@ -317,11 +314,8 @@ func (w *sortedWriter) send() error { if err != nil { return y.Wrapf(err, "Error while retriving datakey in sortedWriter.send") } - bopts := table.Options{ - BlockSize: w.db.opt.BlockSize, - BloomFalsePositive: w.db.opt.BloomFalsePositive, - DataKey: dk, - } + bopts := buildTableOptions(w.db.opt) + bopts.DataKey = dk w.builder = table.NewTableBuilder(bopts) return nil } @@ -348,12 +342,8 @@ func (w *sortedWriter) createTable(builder *table.Builder) error { if _, err := fd.Write(data); err != nil { return err } - - opts := table.Options{ - LoadingMode: w.db.opt.TableLoadingMode, - ChkMode: w.db.opt.ChecksumVerificationMode, - DataKey: builder.DataKey(), - } + opts := buildTableOptions(w.db.opt) + opts.DataKey = builder.DataKey() tbl, err := table.OpenTable(fd, opts) if err != nil { return err @@ -384,9 +374,10 @@ func (w *sortedWriter) createTable(builder *table.Builder) error { } // Now that table can be opened successfully, let's add this to the MANIFEST. change := &pb.ManifestChange{ - Id: tbl.ID(), - Op: pb.ManifestChange_CREATE, - Level: uint32(lhandler.level), + Id: tbl.ID(), + Op: pb.ManifestChange_CREATE, + Level: uint32(lhandler.level), + Compression: uint32(tbl.CompressionType()), } if err := w.db.manifest.addChanges([]*pb.ManifestChange{change}); err != nil { return err diff --git a/stream_writer_test.go b/stream_writer_test.go index 5e252ea32..f361f4b1b 100644 --- a/stream_writer_test.go +++ b/stream_writer_test.go @@ -323,7 +323,6 @@ func TestStreamWriter6(t *testing.T) { } } require.NoError(t, db.Close()) - db, err := Open(db.opt) require.NoError(t, err) require.NoError(t, db.Close()) diff --git a/table/builder.go b/table/builder.go index 0f8c06f77..bd22e4318 100644 --- a/table/builder.go +++ b/table/builder.go @@ -22,9 +22,13 @@ import ( "math" "unsafe" + "github.com/DataDog/zstd" "github.com/dgryski/go-farm" "github.com/golang/protobuf/proto" + "github.com/golang/snappy" + "github.com/pkg/errors" + "github.com/dgraph-io/badger/options" "github.com/dgraph-io/badger/pb" "github.com/dgraph-io/badger/y" "github.com/dgraph-io/ristretto/z" @@ -150,6 +154,18 @@ func (b *Builder) finishBlock() { blockBuf := b.buf.Bytes()[b.baseOffset:] // Store checksum for current block. b.writeChecksum(blockBuf) + // Compress the block. + if b.opt.Compression != options.None { + var err error + // TODO: Find a way to reuse buffers. Current implementation creates a + // new buffer for each compressData call. + blockBuf, err = b.compressData(b.buf.Bytes()[b.baseOffset:]) + y.Check(err) + // Truncate already written data. + b.buf.Truncate(int(b.baseOffset)) + // Write compressed data. + b.buf.Write(blockBuf) + } if b.shouldEncrypt() { block := b.buf.Bytes()[b.baseOffset:] eBlock, err := b.encrypt(block) @@ -322,3 +338,16 @@ func (b *Builder) encrypt(data []byte) ([]byte, error) { func (b *Builder) shouldEncrypt() bool { return b.opt.DataKey != nil } + +// compressData compresses the given data. +func (b *Builder) compressData(data []byte) ([]byte, error) { + switch b.opt.Compression { + case options.None: + return data, nil + case options.Snappy: + return snappy.Encode(nil, data), nil + case options.ZSTD: + return zstd.Compress(nil, data) + } + return nil, errors.New("Unsupported compression type") +} diff --git a/table/builder_test.go b/table/builder_test.go index 420967af6..6fcc6dc59 100644 --- a/table/builder_test.go +++ b/table/builder_test.go @@ -34,8 +34,8 @@ func TestTableIndex(t *testing.T) { rand.Seed(time.Now().Unix()) keyPrefix := "key" t.Run("single key", func(t *testing.T) { - f := buildTestTable(t, keyPrefix, 1) - opts := Options{LoadingMode: options.MemoryMap, ChkMode: options.OnTableAndBlockRead} + opts := Options{Compression: options.ZSTD} + f := buildTestTable(t, keyPrefix, 1, opts) tbl, err := OpenTable(f, opts) require.NoError(t, err) require.Len(t, tbl.blockIndex, 1) @@ -51,6 +51,9 @@ func TestTableIndex(t *testing.T) { require.NoError(t, err) opts = append(opts, Options{BlockSize: 4 * 1024, BloomFalsePositive: 0.01, DataKey: &pb.DataKey{Data: key}}) + // Compression mode. + opts = append(opts, Options{BlockSize: 4 * 1024, BloomFalsePositive: 0.01, + Compression: options.ZSTD}) keysCount := 10000 for _, opt := range opts { builder := NewTableBuilder(opt) @@ -76,10 +79,8 @@ func TestTableIndex(t *testing.T) { _, err = f.Write(builder.Finish()) require.NoError(t, err, "unable to write to file") - topt := Options{LoadingMode: options.LoadToRAM, ChkMode: options.OnTableAndBlockRead, - DataKey: opt.DataKey} - tbl, err := OpenTable(f, topt) - if topt.DataKey == nil { + tbl, err := OpenTable(f, opt) + if opt.DataKey == nil { // key id is zero if thre is no datakey. require.Equal(t, tbl.KeyID(), uint64(0)) } @@ -96,6 +97,22 @@ func TestTableIndex(t *testing.T) { }) } +func TestInvalidCompression(t *testing.T) { + keyPrefix := "key" + opts := Options{Compression: options.ZSTD} + f := buildTestTable(t, keyPrefix, 1000, opts) + t.Run("with correct decompression algo", func(t *testing.T) { + _, err := OpenTable(f, opts) + require.NoError(t, err) + }) + t.Run("with incorrect decompression algo", func(t *testing.T) { + // Set incorrect compression algorithm. + opts.Compression = options.Snappy + _, err := OpenTable(f, opts) + require.Error(t, err) + }) +} + func BenchmarkBuilder(b *testing.B) { rand.Seed(time.Now().Unix()) key := func(i int) []byte { diff --git a/table/table.go b/table/table.go index f09f5bd9c..7c0b89118 100644 --- a/table/table.go +++ b/table/table.go @@ -28,7 +28,9 @@ import ( "sync" "sync/atomic" + "github.com/DataDog/zstd" "github.com/golang/protobuf/proto" + "github.com/golang/snappy" "github.com/pkg/errors" "github.com/dgraph-io/badger/options" @@ -59,6 +61,9 @@ type Options struct { // DataKey is the key used to decrypt the encrypted text. DataKey *pb.DataKey + + // Compression indicates the compression algorithm used for block compression. + Compression options.CompressionType } // TableInterface is useful for testing. @@ -91,6 +96,11 @@ type Table struct { opt *Options } +// CompressionType returns the compression algorithm used for block compression. +func (t *Table) CompressionType() options.CompressionType { + return t.opt.Compression +} + // IncrRef increments the refcount (having to do with whether the file should be deleted) func (t *Table) IncrRef() { atomic.AddInt32(&t.ref, 1) @@ -219,10 +229,11 @@ func OpenTable(fd *os.File, opts Options) (*Table, error) { // OpenInMemoryTable is similar to OpenTable but it opens a new table from the provided data. // OpenInMemoryTable is used for L0 tables. -func OpenInMemoryTable(data []byte, id uint64, dk *pb.DataKey) (*Table, error) { +func OpenInMemoryTable(data []byte, id uint64, opt *Options) (*Table, error) { + opt.LoadingMode = options.LoadToRAM t := &Table{ ref: 1, // Caller is given one reference. - opt: &Options{LoadingMode: options.LoadToRAM, DataKey: dk}, + opt: opt, mmap: data, tableSize: len(data), IsInmemory: true, @@ -245,9 +256,10 @@ func (t *Table) initBiggestAndSmallest() error { it2 := t.NewIterator(true) defer it2.Close() it2.Rewind() - if it2.Valid() { - t.biggest = it2.Key() + if !it2.Valid() { + return errors.Wrapf(it2.err, "failed to initialize biggest for table %s", t.Filename()) } + t.biggest = it2.Key() return nil } @@ -343,7 +355,8 @@ func (t *Table) block(idx int) (*block, error) { } var err error if blk.data, err = t.read(blk.offset, int(ko.Len)); err != nil { - return nil, err + return nil, errors.Wrapf(err, + "failed to read from file: %s at offset: %d, len: %d", t.fd.Name(), blk.offset, ko.Len) } if t.shouldDecrypt() { @@ -353,10 +366,24 @@ func (t *Table) block(idx int) (*block, error) { } } + blk.data, err = t.decompressData(blk.data) + if err != nil { + return nil, errors.Wrapf(err, + "failed to decode compressed data in file: %s at offset: %d, len: %d", + t.fd.Name(), blk.offset, ko.Len) + } + // Read meta data related to block. readPos := len(blk.data) - 4 // First read checksum length. blk.chkLen = int(y.BytesToU32(blk.data[readPos : readPos+4])) + // Checksum length greater than block size could happen if the table was compressed and + // it was opened with an incorrect compression algorithm (or the data was corrupted). + if blk.chkLen > len(blk.data) { + return nil, errors.New("invalid checksum length. Either the data is" + + "corrupted or the table options are incorrectly set") + } + // Read checksum and store it readPos -= blk.chkLen blk.checksum = blk.data[readPos : readPos+blk.chkLen] @@ -476,3 +503,16 @@ func IDToFilename(id uint64) string { func NewFilename(id uint64, dir string) string { return filepath.Join(dir, IDToFilename(id)) } + +// decompressData decompresses the given data. +func (t *Table) decompressData(data []byte) ([]byte, error) { + switch t.opt.Compression { + case options.None: + return data, nil + case options.Snappy: + return snappy.Decode(nil, data) + case options.ZSTD: + return zstd.Decompress(nil, data) + } + return nil, errors.New("Unsupported compression type") +} diff --git a/table/table_test.go b/table/table_test.go index c78855474..98c90fec4 100644 --- a/table/table_test.go +++ b/table/table_test.go @@ -43,7 +43,23 @@ func key(prefix string, i int) string { return prefix + fmt.Sprintf("%04d", i) } -func buildTestTable(t *testing.T, prefix string, n int) *os.File { +func getTestTableOptions() Options { + return Options{ + Compression: options.ZSTD, + LoadingMode: options.LoadToRAM, + ChkMode: options.OnTableAndBlockRead, + BlockSize: 4 * 1024, + BloomFalsePositive: 0.01, + } + +} +func buildTestTable(t *testing.T, prefix string, n int, opts Options) *os.File { + if opts.BloomFalsePositive == 0 { + opts.BloomFalsePositive = 0.01 + } + if opts.BlockSize == 0 { + opts.BlockSize = 4 * 1024 + } y.AssertTrue(n <= 10000) keyValues := make([][]string, n) for i := 0; i < n; i++ { @@ -51,12 +67,11 @@ func buildTestTable(t *testing.T, prefix string, n int) *os.File { v := fmt.Sprintf("%d", i) keyValues[i] = []string{k, v} } - return buildTable(t, keyValues) + return buildTable(t, keyValues, opts) } // keyValues is n by 2 where n is number of pairs. -func buildTable(t *testing.T, keyValues [][]string) *os.File { - opts := Options{BlockSize: 4 * 1024, BloomFalsePositive: 0.01} +func buildTable(t *testing.T, keyValues [][]string, opts Options) *os.File { b := NewTableBuilder(opts) defer b.Close() // TODO: Add test for file garbage collection here. No files should be left after the tests here. @@ -86,8 +101,8 @@ func buildTable(t *testing.T, keyValues [][]string) *os.File { func TestTableIterator(t *testing.T) { for _, n := range []int{99, 100, 101} { t.Run(fmt.Sprintf("n=%d", n), func(t *testing.T) { - f := buildTestTable(t, "key", n) - opts := Options{LoadingMode: options.MemoryMap, ChkMode: options.OnTableAndBlockRead} + opts := getTestTableOptions() + f := buildTestTable(t, "key", n, opts) table, err := OpenTable(f, opts) require.NoError(t, err) defer table.DecrRef() @@ -109,8 +124,8 @@ func TestTableIterator(t *testing.T) { func TestSeekToFirst(t *testing.T) { for _, n := range []int{99, 100, 101, 199, 200, 250, 9999, 10000} { t.Run(fmt.Sprintf("n=%d", n), func(t *testing.T) { - f := buildTestTable(t, "key", n) - opts := Options{LoadingMode: options.MemoryMap, ChkMode: options.OnTableAndBlockRead} + opts := getTestTableOptions() + f := buildTestTable(t, "key", n, opts) table, err := OpenTable(f, opts) require.NoError(t, err) defer table.DecrRef() @@ -128,8 +143,8 @@ func TestSeekToFirst(t *testing.T) { func TestSeekToLast(t *testing.T) { for _, n := range []int{99, 100, 101, 199, 200, 250, 9999, 10000} { t.Run(fmt.Sprintf("n=%d", n), func(t *testing.T) { - f := buildTestTable(t, "key", n) - opts := Options{LoadingMode: options.MemoryMap, ChkMode: options.OnTableAndBlockRead} + opts := getTestTableOptions() + f := buildTestTable(t, "key", n, opts) table, err := OpenTable(f, opts) require.NoError(t, err) defer table.DecrRef() @@ -150,8 +165,8 @@ func TestSeekToLast(t *testing.T) { } func TestSeek(t *testing.T) { - f := buildTestTable(t, "k", 10000) - opts := Options{LoadingMode: options.MemoryMap, ChkMode: options.OnTableAndBlockRead} + opts := getTestTableOptions() + f := buildTestTable(t, "k", 10000, opts) table, err := OpenTable(f, opts) require.NoError(t, err) defer table.DecrRef() @@ -186,8 +201,8 @@ func TestSeek(t *testing.T) { } func TestSeekForPrev(t *testing.T) { - f := buildTestTable(t, "k", 10000) - opts := Options{LoadingMode: options.MemoryMap, ChkMode: options.OnTableAndBlockRead} + opts := getTestTableOptions() + f := buildTestTable(t, "k", 10000, opts) table, err := OpenTable(f, opts) require.NoError(t, err) defer table.DecrRef() @@ -225,8 +240,8 @@ func TestIterateFromStart(t *testing.T) { // Vary the number of elements added. for _, n := range []int{99, 100, 101, 199, 200, 250, 9999, 10000} { t.Run(fmt.Sprintf("n=%d", n), func(t *testing.T) { - f := buildTestTable(t, "key", n) - opts := Options{LoadingMode: options.MemoryMap, ChkMode: options.OnTableAndBlockRead} + opts := getTestTableOptions() + f := buildTestTable(t, "key", n, opts) table, err := OpenTable(f, opts) require.NoError(t, err) defer table.DecrRef() @@ -253,8 +268,8 @@ func TestIterateFromEnd(t *testing.T) { // Vary the number of elements added. for _, n := range []int{99, 100, 101, 199, 200, 250, 9999, 10000} { t.Run(fmt.Sprintf("n=%d", n), func(t *testing.T) { - f := buildTestTable(t, "key", n) - opts := Options{LoadingMode: options.MemoryMap, ChkMode: options.OnTableAndBlockRead} + opts := getTestTableOptions() + f := buildTestTable(t, "key", n, opts) table, err := OpenTable(f, opts) require.NoError(t, err) defer table.DecrRef() @@ -277,8 +292,9 @@ func TestIterateFromEnd(t *testing.T) { } func TestTable(t *testing.T) { - f := buildTestTable(t, "key", 10000) - opts := Options{LoadingMode: options.FileIO, ChkMode: options.OnTableAndBlockRead} + opts := getTestTableOptions() + opts.LoadingMode = options.FileIO + f := buildTestTable(t, "key", 10000, opts) table, err := OpenTable(f, opts) require.NoError(t, err) defer table.DecrRef() @@ -305,8 +321,8 @@ func TestTable(t *testing.T) { } func TestIterateBackAndForth(t *testing.T) { - f := buildTestTable(t, "key", 10000) - opts := Options{LoadingMode: options.MemoryMap, ChkMode: options.OnTableAndBlockRead} + opts := getTestTableOptions() + f := buildTestTable(t, "key", 10000, opts) table, err := OpenTable(f, opts) require.NoError(t, err) defer table.DecrRef() @@ -347,8 +363,8 @@ func TestIterateBackAndForth(t *testing.T) { } func TestUniIterator(t *testing.T) { - f := buildTestTable(t, "key", 10000) - opts := Options{LoadingMode: options.MemoryMap, ChkMode: options.OnTableAndBlockRead} + opts := getTestTableOptions() + f := buildTestTable(t, "key", 10000, opts) table, err := OpenTable(f, opts) require.NoError(t, err) defer table.DecrRef() @@ -380,12 +396,12 @@ func TestUniIterator(t *testing.T) { // Try having only one table. func TestConcatIteratorOneTable(t *testing.T) { + opts := getTestTableOptions() f := buildTable(t, [][]string{ {"k1", "a1"}, {"k2", "a2"}, - }) + }, opts) - opts := Options{LoadingMode: options.MemoryMap, ChkMode: options.OnTableAndBlockRead} tbl, err := OpenTable(f, opts) require.NoError(t, err) defer tbl.DecrRef() @@ -403,14 +419,13 @@ func TestConcatIteratorOneTable(t *testing.T) { } func TestConcatIterator(t *testing.T) { - f := buildTestTable(t, "keya", 10000) - f2 := buildTestTable(t, "keyb", 10000) - f3 := buildTestTable(t, "keyc", 10000) - opts := Options{LoadingMode: options.MemoryMap, ChkMode: options.OnTableAndBlockRead} + opts := getTestTableOptions() + f := buildTestTable(t, "keya", 10000, opts) + f2 := buildTestTable(t, "keyb", 10000, opts) + f3 := buildTestTable(t, "keyc", 10000, opts) tbl, err := OpenTable(f, opts) require.NoError(t, err) defer tbl.DecrRef() - opts.LoadingMode = options.LoadToRAM tbl2, err := OpenTable(f2, opts) require.NoError(t, err) defer tbl2.DecrRef() @@ -485,15 +500,15 @@ func TestConcatIterator(t *testing.T) { } func TestMergingIterator(t *testing.T) { + opts := getTestTableOptions() f1 := buildTable(t, [][]string{ {"k1", "a1"}, {"k2", "a2"}, - }) + }, opts) f2 := buildTable(t, [][]string{ {"k1", "b1"}, {"k2", "b2"}, - }) - opts := Options{LoadingMode: options.LoadToRAM, ChkMode: options.OnTableAndBlockRead} + }, opts) tbl1, err := OpenTable(f1, opts) require.NoError(t, err) defer tbl1.DecrRef() @@ -526,15 +541,15 @@ func TestMergingIterator(t *testing.T) { } func TestMergingIteratorReversed(t *testing.T) { + opts := getTestTableOptions() f1 := buildTable(t, [][]string{ {"k1", "a1"}, {"k2", "a2"}, - }) + }, opts) f2 := buildTable(t, [][]string{ {"k1", "b1"}, {"k2", "b2"}, - }) - opts := Options{LoadingMode: options.LoadToRAM, ChkMode: options.OnTableAndBlockRead} + }, opts) tbl1, err := OpenTable(f1, opts) require.NoError(t, err) defer tbl1.DecrRef() @@ -568,13 +583,13 @@ func TestMergingIteratorReversed(t *testing.T) { // Take only the first iterator. func TestMergingIteratorTakeOne(t *testing.T) { + opts := getTestTableOptions() f1 := buildTable(t, [][]string{ {"k1", "a1"}, {"k2", "a2"}, - }) - f2 := buildTable(t, [][]string{}) + }, opts) + f2 := buildTable(t, [][]string{{"l1", "b1"}}, opts) - opts := Options{LoadingMode: options.LoadToRAM, ChkMode: options.OnTableAndBlockRead} t1, err := OpenTable(f1, opts) require.NoError(t, err) defer t1.DecrRef() @@ -604,18 +619,25 @@ func TestMergingIteratorTakeOne(t *testing.T) { require.EqualValues(t, 'A', vs.Meta) it.Next() + k = it.Key() + require.EqualValues(t, "l1", string(y.ParseKey(k))) + vs = it.Value() + require.EqualValues(t, "b1", string(vs.Value)) + require.EqualValues(t, 'A', vs.Meta) + it.Next() + require.False(t, it.Valid()) } // Take only the second iterator. func TestMergingIteratorTakeTwo(t *testing.T) { - f1 := buildTable(t, [][]string{}) + opts := getTestTableOptions() + f1 := buildTable(t, [][]string{{"l1", "b1"}}, opts) f2 := buildTable(t, [][]string{ {"k1", "a1"}, {"k2", "a2"}, - }) + }, opts) - opts := Options{LoadingMode: options.LoadToRAM, ChkMode: options.OnTableAndBlockRead} t1, err := OpenTable(f1, opts) require.NoError(t, err) defer t1.DecrRef() @@ -644,6 +666,15 @@ func TestMergingIteratorTakeTwo(t *testing.T) { require.EqualValues(t, "a2", string(vs.Value)) require.EqualValues(t, 'A', vs.Meta) it.Next() + require.True(t, it.Valid()) + + k = it.Key() + require.EqualValues(t, "l1", string(y.ParseKey(k))) + vs = it.Value() + require.EqualValues(t, "b1", string(vs.Value)) + require.EqualValues(t, 'A', vs.Meta) + it.Next() + require.False(t, it.Valid()) } @@ -658,7 +689,7 @@ func TestTableBigValues(t *testing.T) { require.NoError(t, err, "unable to create file") n := 100 // Insert 100 keys. - opts := Options{BlockSize: 4 * 1024, BloomFalsePositive: 0.01} + opts := Options{Compression: options.ZSTD, BlockSize: 4 * 1024, BloomFalsePositive: 0.01} builder := NewTableBuilder(opts) for i := 0; i < n; i++ { key := y.KeyWithTs([]byte(key("", i)), 0) @@ -668,7 +699,6 @@ func TestTableBigValues(t *testing.T) { _, err = f.Write(builder.Finish()) require.NoError(t, err, "unable to write to file") - opts = Options{LoadingMode: options.LoadToRAM, ChkMode: options.OnTableAndBlockRead} tbl, err := OpenTable(f, opts) require.NoError(t, err, "unable to open table") defer tbl.DecrRef() @@ -692,12 +722,12 @@ func TestTableChecksum(t *testing.T) { // we are going to write random byte at random location in table file. rb := make([]byte, 1) rand.Read(rb) - f := buildTestTable(t, "k", 10000) + opts := getTestTableOptions() + f := buildTestTable(t, "k", 10000, opts) fi, err := f.Stat() require.NoError(t, err, "unable to get file information") f.WriteAt(rb, rand.Int63n(fi.Size())) - opts := Options{LoadingMode: options.LoadToRAM, ChkMode: options.OnTableAndBlockRead} _, err = OpenTable(f, opts) if err == nil || !strings.Contains(err.Error(), "checksum") { t.Fatal("Test should have been failed with checksum mismatch error") @@ -730,7 +760,7 @@ func BenchmarkReadAndBuild(b *testing.B) { // Iterate b.N times over the entire table. for i := 0; i < b.N; i++ { func() { - opts := Options{BlockSize: 4 * 0124, BloomFalsePositive: 0.01} + opts := Options{Compression: options.ZSTD, BlockSize: 4 * 0124, BloomFalsePositive: 0.01} newBuilder := NewTableBuilder(opts) it := tbl.NewIterator(false) defer it.Close() @@ -751,7 +781,7 @@ func BenchmarkReadMerged(b *testing.B) { var tables []*Table for i := 0; i < m; i++ { filename := fmt.Sprintf("%s%s%d.sst", os.TempDir(), string(os.PathSeparator), rand.Int63()) - opts := Options{BlockSize: 4 * 1024, BloomFalsePositive: 0.01} + opts := Options{Compression: options.ZSTD, BlockSize: 4 * 1024, BloomFalsePositive: 0.01} builder := NewTableBuilder(opts) f, err := y.OpenSyncedFile(filename, true) y.Check(err) @@ -764,7 +794,6 @@ func BenchmarkReadMerged(b *testing.B) { } _, err = f.Write(builder.Finish()) require.NoError(b, err, "unable to write to file") - opts = Options{LoadingMode: options.LoadToRAM, ChkMode: options.OnTableAndBlockRead} tbl, err := OpenTable(f, opts) y.Check(err) tables = append(tables, tbl) @@ -838,7 +867,7 @@ func BenchmarkRandomRead(b *testing.B) { func getTableForBenchmarks(b *testing.B, count int) *Table { rand.Seed(time.Now().Unix()) - opts := Options{BlockSize: 4 * 1024, BloomFalsePositive: 0.01} + opts := Options{Compression: options.ZSTD, BlockSize: 4 * 1024, BloomFalsePositive: 0.01} builder := NewTableBuilder(opts) filename := fmt.Sprintf("%s%s%d.sst", os.TempDir(), string(os.PathSeparator), rand.Int63()) f, err := y.OpenSyncedFile(filename, true) @@ -851,7 +880,6 @@ func getTableForBenchmarks(b *testing.B, count int) *Table { _, err = f.Write(builder.Finish()) require.NoError(b, err, "unable to write to file") - opts = Options{LoadingMode: options.LoadToRAM, ChkMode: options.NoVerification} tbl, err := OpenTable(f, opts) require.NoError(b, err, "unable to open table") return tbl