diff --git a/backup.go b/backup.go index 3c6242167..1c0032401 100644 --- a/backup.go +++ b/backup.go @@ -116,11 +116,17 @@ func (stream *Stream) Backup(w io.Writer, since uint64) (uint64, error) { var maxVersion uint64 stream.Send = func(list *pb.KVList) error { + out := list.Kv[:0] for _, kv := range list.Kv { if maxVersion < kv.Version { maxVersion = kv.Version } + if !kv.StreamDone { + // Don't pick stream done changes. + out = append(out, kv) + } } + list.Kv = out return writeTo(list, w) } diff --git a/badger/cmd/stream.go b/badger/cmd/stream.go index 05c6cf14e..da6ee875a 100644 --- a/badger/cmd/stream.go +++ b/badger/cmd/stream.go @@ -17,6 +17,7 @@ package cmd import ( + "fmt" "io" "math" "os" @@ -53,6 +54,8 @@ func init() { streamCmd.Flags().Uint32VarP(&compressionType, "compression", "", 0, "Option to configure the compression type in output DB. "+ "0 to disable, 1 for Snappy, and 2 for ZSTD.") + streamCmd.Flags().StringVarP(&keyPath, "encryption-key-file", "e", "", + "Path of the encryption key file.") } func stream(cmd *cobra.Command, args []string) error { @@ -74,23 +77,37 @@ func stream(cmd *cobra.Command, args []string) error { if numVersions <= 0 { numVersions = math.MaxInt32 } + encKey, err := getKey(keyPath) + if err != nil { + return err + } inOpt := badger.DefaultOptions(sstDir). WithReadOnly(readOnly). WithValueThreshold(1 << 10 /* 1KB */). - WithNumVersionsToKeep(numVersions) + WithNumVersionsToKeep(numVersions). + WithBlockCacheSize(100 << 20). + WithIndexCacheSize(200 << 20). + WithEncryptionKey(encKey) // Options for output DB. if compressionType < 0 || compressionType > 2 { return errors.Errorf( "compression value must be one of 0 (disabled), 1 (Snappy), or 2 (ZSTD)") } - outOpt := inOpt.WithDir(outDir).WithValueDir(outDir). - WithCompression(options.CompressionType(compressionType)).WithReadOnly(false) + outOpt := inOpt. + WithDir(outDir). + WithValueDir(outDir). + WithNumVersionsToKeep(numVersions). + WithCompression(options.CompressionType(compressionType)). + WithReadOnly(false) inDB, err := badger.OpenManaged(inOpt) if err != nil { return y.Wrapf(err, "cannot open DB at %s", sstDir) } defer inDB.Close() - return inDB.StreamDB(outOpt) + + err = inDB.StreamDB(outOpt) + fmt.Println("Done.") + return err } diff --git a/badger/main.go b/badger/main.go index 1d6bcd65e..43fe27cf9 100644 --- a/badger/main.go +++ b/badger/main.go @@ -23,6 +23,8 @@ import ( "runtime" "github.com/dgraph-io/badger/v2/badger/cmd" + "github.com/dgraph-io/ristretto/z" + "github.com/dustin/go-humanize" ) func main() { @@ -38,5 +40,16 @@ func main() { }() runtime.SetBlockProfileRate(100) runtime.GOMAXPROCS(128) + + out := z.CallocNoRef(1) + fmt.Printf("jemalloc enabled: %v\n", len(out) > 0) + z.StatsPrint() + z.Free(out) + cmd.Execute() + fmt.Printf("Num Allocated Bytes at program end: %s\n", + humanize.IBytes(uint64(z.NumAllocBytes()))) + if z.NumAllocBytes() > 0 { + z.PrintLeaks() + } } diff --git a/go.mod b/go.mod index a0a0fb988..3208b4b91 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ go 1.12 require ( github.com/DataDog/zstd v1.4.1 github.com/cespare/xxhash v1.1.0 - github.com/dgraph-io/ristretto v0.0.4-0.20201007164332-9739cfa2564b + github.com/dgraph-io/ristretto v0.0.4-0.20201013194302-6d6fac64beae github.com/dustin/go-humanize v1.0.0 github.com/golang/protobuf v1.3.1 github.com/golang/snappy v0.0.1 diff --git a/go.sum b/go.sum index ccd9e6fb0..c64123ee0 100644 --- a/go.sum +++ b/go.sum @@ -13,8 +13,10 @@ github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwc github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/dgraph-io/ristretto v0.0.4-0.20201007164332-9739cfa2564b h1:CfEwAh85rqfhhE/DrDCbFdcwVpZ8ESnFRhLatiKdDYM= -github.com/dgraph-io/ristretto v0.0.4-0.20201007164332-9739cfa2564b/go.mod h1:bDI4cDaalvYSji3vBVDKrn9ouDZrwN974u8ZO/AhYXs= +github.com/dgraph-io/ristretto v0.0.4-0.20201012224315-0af15dd47cb9 h1:/E/ew7/iVTZnsWhUGi+dNpcWl+4D6FfojPmqX7nghWI= +github.com/dgraph-io/ristretto v0.0.4-0.20201012224315-0af15dd47cb9/go.mod h1:bDI4cDaalvYSji3vBVDKrn9ouDZrwN974u8ZO/AhYXs= +github.com/dgraph-io/ristretto v0.0.4-0.20201013194302-6d6fac64beae h1:yh5085twGpsgfuu56DXKOM3SKyZKQPskJIoMNb3jzos= +github.com/dgraph-io/ristretto v0.0.4-0.20201013194302-6d6fac64beae/go.mod h1:bDI4cDaalvYSji3vBVDKrn9ouDZrwN974u8ZO/AhYXs= github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA= github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= diff --git a/pb/badgerpb2.pb.go b/pb/badgerpb2.pb.go index ba6f93b41..6da2cf853 100644 --- a/pb/badgerpb2.pb.go +++ b/pb/badgerpb2.pb.go @@ -200,7 +200,9 @@ func (m *KV) GetStreamDone() bool { } type KVList struct { - Kv []*KV `protobuf:"bytes,1,rep,name=kv,proto3" json:"kv,omitempty"` + Kv []*KV `protobuf:"bytes,1,rep,name=kv,proto3" json:"kv,omitempty"` + // alloc_ref used internally for memory management. + AllocRef uint64 `protobuf:"varint,10,opt,name=alloc_ref,json=allocRef,proto3" json:"alloc_ref,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -246,6 +248,13 @@ func (m *KVList) GetKv() []*KV { return nil } +func (m *KVList) GetAllocRef() uint64 { + if m != nil { + return m.AllocRef + } + return 0 +} + type ManifestChangeSet struct { // A set of changes that are applied atomically. Changes []*ManifestChange `protobuf:"bytes,1,rep,name=changes,proto3" json:"changes,omitempty"` @@ -522,44 +531,45 @@ func init() { func init() { proto.RegisterFile("badgerpb2.proto", fileDescriptor_e63e84f9f0d3998c) } var fileDescriptor_e63e84f9f0d3998c = []byte{ - // 589 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x74, 0x93, 0xcf, 0x6e, 0xda, 0x4e, - 0x10, 0xc7, 0x59, 0xe3, 0xf0, 0x67, 0x48, 0x08, 0xbf, 0xd5, 0xaf, 0x92, 0xa3, 0x2a, 0x94, 0x3a, - 0xaa, 0x8a, 0x2a, 0x15, 0x54, 0xa8, 0x7a, 0x27, 0x04, 0x29, 0x88, 0x44, 0x91, 0xb6, 0x51, 0x14, - 0xf5, 0x82, 0x16, 0x7b, 0x0a, 0x16, 0xf8, 0x8f, 0x76, 0x17, 0xab, 0x3c, 0x44, 0xef, 0x7d, 0xa4, - 0x1e, 0x7b, 0xe8, 0x03, 0x54, 0xe9, 0x8b, 0x54, 0xbb, 0x36, 0x14, 0x0e, 0xbd, 0xcd, 0x7c, 0xe7, - 0xbb, 0x3b, 0xe3, 0xcf, 0x8e, 0xe1, 0x74, 0xc6, 0xfd, 0x39, 0x8a, 0x64, 0xd6, 0xeb, 0x24, 0x22, - 0x56, 0x31, 0xad, 0xee, 0x04, 0xf7, 0x27, 0x01, 0x6b, 0xf2, 0x40, 0x1b, 0x50, 0x5c, 0xe2, 0xc6, - 0x21, 0x2d, 0xd2, 0x3e, 0x66, 0x3a, 0xa4, 0xff, 0xc3, 0x51, 0xca, 0x57, 0x6b, 0x74, 0x2c, 0xa3, - 0x65, 0x09, 0x7d, 0x0e, 0xd5, 0xb5, 0x44, 0x31, 0x0d, 0x51, 0x71, 0xa7, 0x68, 0x2a, 0x15, 0x2d, - 0xdc, 0xa2, 0xe2, 0xd4, 0x81, 0x72, 0x8a, 0x42, 0x06, 0x71, 0xe4, 0xd8, 0x2d, 0xd2, 0xb6, 0xd9, - 0x36, 0xa5, 0xe7, 0x00, 0xf8, 0x25, 0x09, 0x04, 0xca, 0x29, 0x57, 0xce, 0x91, 0x29, 0x56, 0x73, - 0x65, 0xa0, 0x28, 0x05, 0xdb, 0x5c, 0x58, 0x32, 0x17, 0x9a, 0x58, 0x77, 0x92, 0x4a, 0x20, 0x0f, - 0xa7, 0x81, 0xef, 0x40, 0x8b, 0xb4, 0x4f, 0x58, 0x25, 0x13, 0xc6, 0x3e, 0x7d, 0x01, 0xb5, 0xbc, - 0xe8, 0xc7, 0x11, 0x3a, 0xb5, 0x16, 0x69, 0x57, 0x18, 0x64, 0xd2, 0x55, 0x1c, 0xa1, 0xfb, 0x1a, - 0x4a, 0x93, 0x87, 0x9b, 0x40, 0x2a, 0x7a, 0x0e, 0xd6, 0x32, 0x75, 0x48, 0xab, 0xd8, 0xae, 0xf5, - 0x4e, 0x3a, 0x7f, 0x49, 0x4c, 0x1e, 0x98, 0xb5, 0x4c, 0xdd, 0x6b, 0xf8, 0xef, 0x96, 0x47, 0xc1, - 0x67, 0x94, 0x6a, 0xb8, 0xe0, 0xd1, 0x1c, 0x3f, 0xa2, 0xa2, 0x7d, 0x28, 0x7b, 0x26, 0x91, 0xf9, - 0xc1, 0xb3, 0xbd, 0x83, 0x87, 0x76, 0xb6, 0x75, 0xba, 0x5f, 0x2d, 0xa8, 0x1f, 0xd6, 0x68, 0x1d, - 0xac, 0xb1, 0x6f, 0xa0, 0xda, 0xcc, 0x1a, 0xfb, 0xb4, 0x0f, 0xd6, 0x5d, 0x62, 0x80, 0xd6, 0x7b, - 0x17, 0xff, 0xbc, 0xb2, 0x73, 0x97, 0xa0, 0xe0, 0x2a, 0x88, 0x23, 0x66, 0xdd, 0x25, 0xfa, 0x21, - 0x6e, 0x30, 0xc5, 0x95, 0xc1, 0x7d, 0xc2, 0xb2, 0x84, 0x3e, 0x83, 0xd2, 0x12, 0x37, 0x9a, 0x4d, - 0x86, 0xfa, 0x68, 0x89, 0x9b, 0xb1, 0x4f, 0x2f, 0xe1, 0x14, 0x23, 0x4f, 0x6c, 0x12, 0x7d, 0x7c, - 0xca, 0x57, 0xf3, 0xd8, 0xd0, 0xae, 0x1f, 0x7c, 0xc1, 0x68, 0xe7, 0x18, 0xac, 0xe6, 0x31, 0xab, - 0xe3, 0x41, 0x4e, 0x5b, 0x50, 0xf3, 0xe2, 0x30, 0x11, 0x28, 0xcd, 0x53, 0x96, 0x4c, 0xdb, 0x7d, - 0xc9, 0xbd, 0x80, 0xea, 0x6e, 0x46, 0x0a, 0x50, 0x1a, 0xb2, 0xd1, 0xe0, 0x7e, 0xd4, 0x28, 0xe8, - 0xf8, 0x6a, 0x74, 0x33, 0xba, 0x1f, 0x35, 0x88, 0x9b, 0x42, 0x65, 0xb8, 0x40, 0x6f, 0x29, 0xd7, - 0x21, 0x7d, 0x07, 0xb6, 0x99, 0x85, 0x98, 0x59, 0xce, 0xf7, 0x66, 0xd9, 0x5a, 0x3a, 0xba, 0xb5, - 0x08, 0xd4, 0x22, 0x64, 0xc6, 0xaa, 0x37, 0x52, 0xae, 0x43, 0x03, 0xcb, 0x66, 0x3a, 0x74, 0x5f, - 0x41, 0x75, 0x67, 0xca, 0xba, 0x0e, 0xfb, 0xbd, 0x61, 0xa3, 0x40, 0x8f, 0xa1, 0xf2, 0xf8, 0x78, - 0xcd, 0xe5, 0xe2, 0xc3, 0xfb, 0x06, 0x71, 0x3d, 0x28, 0x5f, 0x71, 0xc5, 0x27, 0xb8, 0xd9, 0x83, - 0x44, 0xf6, 0x21, 0x51, 0xb0, 0x7d, 0xae, 0x78, 0xbe, 0xd9, 0x26, 0xd6, 0x4f, 0x15, 0xa4, 0xf9, - 0x46, 0x5b, 0x41, 0xaa, 0x37, 0xd6, 0x13, 0xc8, 0x15, 0xfa, 0x7a, 0x63, 0x35, 0xe3, 0x22, 0xab, - 0xe6, 0xca, 0x40, 0xbd, 0x39, 0x83, 0xfa, 0x21, 0x45, 0x5a, 0x86, 0x22, 0x47, 0xd9, 0x28, 0x5c, - 0xf6, 0xbf, 0x3f, 0x35, 0xc9, 0x8f, 0xa7, 0x26, 0xf9, 0xf5, 0xd4, 0x24, 0xdf, 0x7e, 0x37, 0x0b, - 0x9f, 0x5e, 0xce, 0x03, 0xb5, 0x58, 0xcf, 0x3a, 0x5e, 0x1c, 0x76, 0xfd, 0xb9, 0xe0, 0xc9, 0xe2, - 0x6d, 0x10, 0x77, 0x33, 0x06, 0xdd, 0xb4, 0xd7, 0x4d, 0x66, 0xb3, 0x92, 0xf9, 0x31, 0xfb, 0x7f, - 0x02, 0x00, 0x00, 0xff, 0xff, 0x1d, 0xc3, 0x11, 0xa3, 0xab, 0x03, 0x00, 0x00, + // 604 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x74, 0x53, 0xdd, 0x6e, 0xda, 0x4c, + 0x10, 0x65, 0x8d, 0xc3, 0xcf, 0x90, 0x10, 0xbe, 0xd5, 0x57, 0xc9, 0x51, 0x15, 0x4a, 0x1d, 0x55, + 0x42, 0x95, 0x0a, 0x2a, 0x54, 0xbd, 0x27, 0x80, 0x14, 0x44, 0xa2, 0x48, 0xdb, 0x28, 0x8a, 0x7a, + 0x83, 0x16, 0x7b, 0x02, 0x16, 0xf8, 0x47, 0xbb, 0x8b, 0x55, 0x1e, 0xa2, 0xf7, 0x7d, 0xa4, 0x5e, + 0xf6, 0xa2, 0x0f, 0x50, 0xa5, 0x2f, 0x52, 0xed, 0xda, 0xa1, 0x70, 0xd1, 0xbb, 0x99, 0x33, 0xe3, + 0x39, 0xe3, 0x73, 0x66, 0xe1, 0x74, 0xce, 0xfd, 0x05, 0x8a, 0x64, 0xde, 0xeb, 0x24, 0x22, 0x56, + 0x31, 0xad, 0xee, 0x00, 0xf7, 0x27, 0x01, 0x6b, 0x7a, 0x4f, 0x1b, 0x50, 0x5c, 0xe1, 0xd6, 0x21, + 0x2d, 0xd2, 0x3e, 0x66, 0x3a, 0xa4, 0xff, 0xc3, 0x51, 0xca, 0xd7, 0x1b, 0x74, 0x2c, 0x83, 0x65, + 0x09, 0x7d, 0x09, 0xd5, 0x8d, 0x44, 0x31, 0x0b, 0x51, 0x71, 0xa7, 0x68, 0x2a, 0x15, 0x0d, 0xdc, + 0xa0, 0xe2, 0xd4, 0x81, 0x72, 0x8a, 0x42, 0x06, 0x71, 0xe4, 0xd8, 0x2d, 0xd2, 0xb6, 0xd9, 0x73, + 0x4a, 0xcf, 0x01, 0xf0, 0x4b, 0x12, 0x08, 0x94, 0x33, 0xae, 0x9c, 0x23, 0x53, 0xac, 0xe6, 0xc8, + 0x40, 0x51, 0x0a, 0xb6, 0x19, 0x58, 0x32, 0x03, 0x4d, 0xac, 0x99, 0xa4, 0x12, 0xc8, 0xc3, 0x59, + 0xe0, 0x3b, 0xd0, 0x22, 0xed, 0x13, 0x56, 0xc9, 0x80, 0x89, 0x4f, 0x5f, 0x41, 0x2d, 0x2f, 0xfa, + 0x71, 0x84, 0x4e, 0xad, 0x45, 0xda, 0x15, 0x06, 0x19, 0x34, 0x8a, 0x23, 0x74, 0x47, 0x50, 0x9a, + 0xde, 0x5f, 0x07, 0x52, 0xd1, 0x73, 0xb0, 0x56, 0xa9, 0x43, 0x5a, 0xc5, 0x76, 0xad, 0x77, 0xd2, + 0xf9, 0xab, 0xc4, 0xf4, 0x9e, 0x59, 0xab, 0x54, 0xd3, 0xf0, 0xf5, 0x3a, 0xf6, 0x66, 0x02, 0x1f, + 0x0d, 0x8d, 0xcd, 0x2a, 0x06, 0x60, 0xf8, 0xe8, 0x5e, 0xc1, 0x7f, 0x37, 0x3c, 0x0a, 0x1e, 0x51, + 0xaa, 0xe1, 0x92, 0x47, 0x0b, 0xfc, 0x84, 0x8a, 0xf6, 0xa1, 0xec, 0x99, 0x44, 0xe6, 0x53, 0xcf, + 0xf6, 0xa6, 0x1e, 0xb6, 0xb3, 0xe7, 0x4e, 0xf7, 0xab, 0x05, 0xf5, 0xc3, 0x1a, 0xad, 0x83, 0x35, + 0xf1, 0x8d, 0xe2, 0x36, 0xb3, 0x26, 0x3e, 0xed, 0x83, 0x75, 0x9b, 0x18, 0xb5, 0xeb, 0xbd, 0x8b, + 0x7f, 0x8e, 0xec, 0xdc, 0x26, 0x28, 0xb8, 0x0a, 0xe2, 0x88, 0x59, 0xb7, 0x89, 0x76, 0xe9, 0x1a, + 0x53, 0x5c, 0x1b, 0x2f, 0x4e, 0x58, 0x96, 0xd0, 0x17, 0x50, 0x5a, 0xe1, 0x56, 0x0b, 0x97, 0xf9, + 0x70, 0xb4, 0xc2, 0xed, 0xc4, 0xa7, 0x97, 0x70, 0x8a, 0x91, 0x27, 0xb6, 0x89, 0xfe, 0x7c, 0xc6, + 0xd7, 0x8b, 0xd8, 0x58, 0x51, 0x3f, 0xf8, 0x83, 0xf1, 0xae, 0x63, 0xb0, 0x5e, 0xc4, 0xac, 0x8e, + 0x07, 0x39, 0x6d, 0x41, 0xcd, 0x8b, 0xc3, 0x44, 0xa0, 0x34, 0x3e, 0x97, 0x0c, 0xed, 0x3e, 0xe4, + 0x5e, 0x40, 0x75, 0xb7, 0x23, 0x05, 0x28, 0x0d, 0xd9, 0x78, 0x70, 0x37, 0x6e, 0x14, 0x74, 0x3c, + 0x1a, 0x5f, 0x8f, 0xef, 0xc6, 0x0d, 0xe2, 0xa6, 0x50, 0x19, 0x2e, 0xd1, 0x5b, 0xc9, 0x4d, 0x48, + 0xdf, 0x83, 0x6d, 0x76, 0x21, 0x66, 0x97, 0xf3, 0xbd, 0x5d, 0x9e, 0x5b, 0x3a, 0x9a, 0x5a, 0x04, + 0x6a, 0x19, 0x32, 0xd3, 0xaa, 0xcf, 0x55, 0x6e, 0x42, 0x23, 0x96, 0xcd, 0x74, 0xe8, 0xbe, 0x81, + 0xea, 0xae, 0x29, 0x63, 0x1d, 0xf6, 0x7b, 0xc3, 0x46, 0x81, 0x1e, 0x43, 0xe5, 0xe1, 0xe1, 0x8a, + 0xcb, 0xe5, 0xc7, 0x0f, 0x0d, 0xe2, 0x7a, 0x50, 0x1e, 0x71, 0xc5, 0xa7, 0xb8, 0xdd, 0x13, 0x89, + 0xec, 0x8b, 0x44, 0xc1, 0xf6, 0xb9, 0xe2, 0xf9, 0xd9, 0x9b, 0x58, 0x5b, 0x15, 0xa4, 0xf9, 0xb9, + 0x5b, 0x41, 0xaa, 0xcf, 0xd9, 0x13, 0xc8, 0x15, 0xfa, 0xfa, 0x9c, 0xb5, 0xc6, 0x45, 0x56, 0xcd, + 0x91, 0x81, 0x7a, 0x7b, 0x06, 0xf5, 0x43, 0x15, 0x69, 0x19, 0x8a, 0x1c, 0x65, 0xa3, 0x70, 0xd9, + 0xff, 0xfe, 0xd4, 0x24, 0x3f, 0x9e, 0x9a, 0xe4, 0xd7, 0x53, 0x93, 0x7c, 0xfb, 0xdd, 0x2c, 0x7c, + 0x7e, 0xbd, 0x08, 0xd4, 0x72, 0x33, 0xef, 0x78, 0x71, 0xd8, 0xf5, 0x17, 0x82, 0x27, 0xcb, 0x77, + 0x41, 0xdc, 0xcd, 0x34, 0xe8, 0xa6, 0xbd, 0x6e, 0x32, 0x9f, 0x97, 0xcc, 0xab, 0xed, 0xff, 0x09, + 0x00, 0x00, 0xff, 0xff, 0xa7, 0xb8, 0x6c, 0x4c, 0xc8, 0x03, 0x00, 0x00, } func (m *KV) Marshal() (dAtA []byte, err error) { @@ -666,6 +676,11 @@ func (m *KVList) MarshalToSizedBuffer(dAtA []byte) (int, error) { i -= len(m.XXX_unrecognized) copy(dAtA[i:], m.XXX_unrecognized) } + if m.AllocRef != 0 { + i = encodeVarintBadgerpb2(dAtA, i, uint64(m.AllocRef)) + i-- + dAtA[i] = 0x50 + } if len(m.Kv) > 0 { for iNdEx := len(m.Kv) - 1; iNdEx >= 0; iNdEx-- { { @@ -932,6 +947,9 @@ func (m *KVList) Size() (n int) { n += 1 + l + sovBadgerpb2(uint64(l)) } } + if m.AllocRef != 0 { + n += 1 + sovBadgerpb2(uint64(m.AllocRef)) + } if m.XXX_unrecognized != nil { n += len(m.XXX_unrecognized) } @@ -1366,6 +1384,25 @@ func (m *KVList) Unmarshal(dAtA []byte) error { return err } iNdEx = postIndex + case 10: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field AllocRef", wireType) + } + m.AllocRef = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowBadgerpb2 + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.AllocRef |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := skipBadgerpb2(dAtA[iNdEx:]) diff --git a/pb/badgerpb2.proto b/pb/badgerpb2.proto index 862e1477c..821185d44 100644 --- a/pb/badgerpb2.proto +++ b/pb/badgerpb2.proto @@ -37,6 +37,9 @@ message KV { message KVList { repeated KV kv = 1; + + // alloc_ref used internally for memory management. + uint64 alloc_ref = 10; } message ManifestChangeSet { diff --git a/stream.go b/stream.go index fdbf76580..d6c5e4317 100644 --- a/stream.go +++ b/stream.go @@ -26,6 +26,7 @@ import ( "github.com/dgraph-io/badger/v2/pb" "github.com/dgraph-io/badger/v2/y" + "github.com/dgraph-io/ristretto/z" humanize "github.com/dustin/go-humanize" "github.com/golang/protobuf/proto" ) @@ -68,6 +69,12 @@ type Stream struct { // with a mismatching key. See example usage in ToList function. Can be left nil to use ToList // function by default. // + // KeyToList has access to z.Allocator accessible via stream.Allocator(itr.ThreadId). This + // allocator can be used to allocate KVs, to decrease the memory pressure on Go GC. Stream + // framework takes care of releasing those resources after calling Send. AllocRef does + // NOT need to be set in the returned KVList, as Stream framework would ignore that field, + // instead using the allocator assigned to that thread id. + // // Note: Calls to KeyToList are concurrent. KeyToList func(key []byte, itr *Iterator) (*pb.KVList, error) @@ -80,11 +87,24 @@ type Stream struct { rangeCh chan keyRange kvChan chan *pb.KVList nextStreamId uint32 + + // Use allocators to generate KVs. + allocatorsMu sync.RWMutex + allocators map[int]*z.Allocator +} + +func (st *Stream) Allocator(threadId int) *z.Allocator { + st.allocatorsMu.RLock() + defer st.allocatorsMu.RUnlock() + return st.allocators[threadId] } // ToList is a default implementation of KeyToList. It picks up all valid versions of the key, // skipping over deleted or expired keys. func (st *Stream) ToList(key []byte, itr *Iterator) (*pb.KVList, error) { + alloc := st.Allocator(itr.ThreadId) + ka := alloc.Copy(key) + list := &pb.KVList{} for ; itr.Valid(); itr.Next() { item := itr.Item() @@ -96,17 +116,20 @@ func (st *Stream) ToList(key []byte, itr *Iterator) (*pb.KVList, error) { break } - valCopy, err := item.ValueCopy(nil) - if err != nil { + kv := y.NewKV(alloc) + kv.Key = ka + + if err := item.Value(func(val []byte) error { + kv.Value = alloc.Copy(val) + return nil + + }); err != nil { return nil, err } - kv := &pb.KV{ - Key: item.KeyCopy(nil), - Value: valCopy, - UserMeta: []byte{item.UserMeta()}, - Version: item.Version(), - ExpiresAt: item.ExpiresAt(), - } + kv.Version = item.Version() + kv.ExpiresAt = item.ExpiresAt() + kv.UserMeta = alloc.Copy([]byte{item.UserMeta()}) + list.Kv = append(list.Kv, kv) if st.db.opt.NumVersionsToKeep == 1 { break @@ -151,6 +174,21 @@ func (st *Stream) produceRanges(ctx context.Context) { close(st.rangeCh) } +func (st *Stream) newAllocator(threadId int) *z.Allocator { + st.allocatorsMu.Lock() + var a *z.Allocator + if cur, ok := st.allocators[threadId]; ok && cur.Size() == 0 { + a = cur // Reuse. + } else { + // Current allocator has been used already. Create a new one. + a = z.NewAllocator(batchSize) + // a.Tag = fmt.Sprintf("Stream %d: %s", threadId, st.LogPrefix) + st.allocators[threadId] = a + } + st.allocatorsMu.Unlock() + return a +} + // produceKVs picks up ranges from rangeCh, generates KV lists and sends them to kvChan. func (st *Stream) produceKVs(ctx context.Context, threadId int) error { var size int @@ -175,6 +213,7 @@ func (st *Stream) produceKVs(ctx context.Context, threadId int) error { streamId := atomic.AddUint32(&st.nextStreamId, 1) outList := new(pb.KVList) + outList.AllocRef = st.newAllocator(threadId).Ref sendIt := func() error { select { @@ -183,9 +222,11 @@ func (st *Stream) produceKVs(ctx context.Context, threadId int) error { return ctx.Err() } outList = new(pb.KVList) + outList.AllocRef = st.newAllocator(threadId).Ref size = 0 return nil } + var prevKey []byte for itr.Seek(kr.left); itr.Valid(); { // it.Valid would only return true for keys with the provided Prefix in iterOpts. @@ -226,13 +267,12 @@ func (st *Stream) produceKVs(ctx context.Context, threadId int) error { } } } - if len(outList.Kv) > 0 { - // TODO: Think of a way to indicate that a stream is over. - if err := sendIt(); err != nil { - return err - } - } - return nil + // Mark the stream as done. + outList.Kv = append(outList.Kv, &pb.KV{ + StreamId: streamId, + StreamDone: true, + }) + return sendIt() } for { @@ -258,6 +298,13 @@ func (st *Stream) streamKVs(ctx context.Context) error { defer t.Stop() now := time.Now() + var allocs []*z.Allocator + defer func() { + for _, a := range allocs { + a.Release() + } + }() + sendBatch := func(batch *pb.KVList) error { sz := uint64(proto.Size(batch)) bytesSent += sz @@ -268,6 +315,11 @@ func (st *Stream) streamKVs(ctx context.Context) error { } st.db.opt.Infof("%s Created batch of size: %s in %s.\n", st.LogPrefix, humanize.Bytes(sz), time.Since(t)) + + for _, a := range allocs { + a.Release() + } + allocs = allocs[:0] return nil } @@ -288,6 +340,8 @@ func (st *Stream) streamKVs(ctx context.Context) error { } y.AssertTrue(kvs != nil) batch.Kv = append(batch.Kv, kvs.Kv...) + allocs = append(allocs, z.AllocatorFrom(kvs.AllocRef)) + default: break loop } @@ -309,8 +363,9 @@ outer: continue } speed := bytesSent / durSec - st.db.opt.Infof("%s Time elapsed: %s, bytes sent: %s, speed: %s/sec\n", st.LogPrefix, - y.FixedDuration(dur), humanize.Bytes(bytesSent), humanize.Bytes(speed)) + st.db.opt.Infof("%s Time elapsed: %s, bytes sent: %s, speed: %s/sec, jemalloc: %s\n", + st.LogPrefix, y.FixedDuration(dur), humanize.IBytes(bytesSent), + humanize.IBytes(speed), humanize.IBytes(uint64(z.NumAllocBytes()))) case kvs, ok := <-st.kvChan: if !ok { @@ -318,6 +373,7 @@ outer: } y.AssertTrue(kvs != nil) batch = kvs + allocs = append(allocs, z.AllocatorFrom(kvs.AllocRef)) // Otherwise, slurp more keys into this batch. if err := slurp(batch); err != nil { @@ -385,11 +441,20 @@ func (st *Stream) Orchestrate(ctx context.Context) error { // Wait for key streaming to be over. err := <-kvErr + + for _, a := range st.allocators { + a.Release() + } return err } func (db *DB) newStream() *Stream { - return &Stream{db: db, NumGo: 16, LogPrefix: "Badger.Stream"} + return &Stream{ + db: db, + NumGo: 16, + LogPrefix: "Badger.Stream", + allocators: make(map[int]*z.Allocator), + } } // NewStream creates a new Stream. diff --git a/stream_test.go b/stream_test.go index f3c4b1d48..f0bd8bd0c 100644 --- a/stream_test.go +++ b/stream_test.go @@ -27,6 +27,8 @@ import ( bpb "github.com/dgraph-io/badger/v2/pb" "github.com/dgraph-io/badger/v2/y" + "github.com/dgraph-io/ristretto/z" + "github.com/golang/protobuf/proto" "github.com/stretchr/testify/require" ) @@ -50,7 +52,13 @@ type collector struct { } func (c *collector) Send(list *bpb.KVList) error { - c.kv = append(c.kv, list.Kv...) + for _, kv := range list.Kv { + if kv.StreamDone == true { + continue + } + cp := proto.Clone(kv).(*bpb.KV) + c.kv = append(c.kv, cp) + } return nil } @@ -156,6 +164,7 @@ func TestStream(t *testing.T) { require.Equal(t, 50, count, "Count mismatch for pred: %s", pred) } require.NoError(t, db.Close()) + require.Equal(t, int64(0), z.NumAllocBytes()) } func TestStreamWithThreadId(t *testing.T) { diff --git a/stream_writer.go b/stream_writer.go index ecdb079fc..71c934ae5 100644 --- a/stream_writer.go +++ b/stream_writer.go @@ -112,7 +112,7 @@ func (sw *StreamWriter) Write(kvs *pb.KVList) error { } e := &Entry{ Key: y.KeyWithTs(kv.Key, kv.Version), - Value: kv.Value, + Value: y.Copy(kv.Value), UserMeta: userMeta, ExpiresAt: kv.ExpiresAt, meta: meta, diff --git a/test.sh b/test.sh index 23bb660c2..fdf899e48 100755 --- a/test.sh +++ b/test.sh @@ -15,16 +15,19 @@ pushd badger go build -v . popd +# tags="-tags=jemalloc" +tags="" + # Run the memory intensive tests first. -go test -v -run='TestBigKeyValuePairs$' --manual=true -go test -v -run='TestPushValueLogLimit' --manual=true +go test -v $tags -run='TestBigKeyValuePairs$' --manual=true +go test -v $tags -run='TestPushValueLogLimit' --manual=true # Run the special Truncate test. rm -rf p -go test -v -run='TestTruncateVlogNoClose$' --manual=true +go test -v $tags -run='TestTruncateVlogNoClose$' --manual=true truncate --size=4096 p/000000.vlog -go test -v -run='TestTruncateVlogNoClose2$' --manual=true -go test -v -run='TestTruncateVlogNoClose3$' --manual=true +go test -v $tags -run='TestTruncateVlogNoClose2$' --manual=true +go test -v $tags -run='TestTruncateVlogNoClose3$' --manual=true rm -rf p # Run the normal tests. @@ -32,8 +35,8 @@ echo "==> Starting tests.. " # go test -timeout=25m -v -race github.com/dgraph-io/badger/v2/... for pkg in $packages; do echo "===> Testing $pkg" - go test -timeout=25m -v -race $pkg + go test $tags -timeout=25m -v -race $pkg done echo "===> Testing root level" -go test -timeout=25m -v . -race +go test $tags -timeout=25m -v . -race diff --git a/y/y.go b/y/y.go index cd75c3dfd..e527d42a1 100644 --- a/y/y.go +++ b/y/y.go @@ -29,6 +29,8 @@ import ( "time" "unsafe" + "github.com/dgraph-io/badger/v2/pb" + "github.com/dgraph-io/ristretto/z" "github.com/pkg/errors" ) @@ -465,3 +467,13 @@ func (r *PageBufferReader) Read(p []byte) (int, error) { return read, nil } + +const kvsz = int(unsafe.Sizeof(pb.KV{})) + +func NewKV(alloc *z.Allocator) *pb.KV { + if alloc == nil { + return &pb.KV{} + } + b := alloc.AllocateAligned(kvsz) + return (*pb.KV)(unsafe.Pointer(&b[0])) +}