Skip to content

Commit

Permalink
Update compression method and expose badger options (#31)
Browse files Browse the repository at this point in the history
1. Use zlib compression
2. Add statsd metrics for keys added and keys deleted
3. Improved compaction and key deletion latency
4. Expose badger options as follows:
```
badger: 
  levelOneSize: 204800000
  maxLevels: 3
  syncWrites: false
```
  • Loading branch information
a9kitkumarsinha committed Jul 1, 2020
1 parent 565435b commit 4bd46c7
Show file tree
Hide file tree
Showing 16 changed files with 131 additions and 39 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ We are open to contributions, feel free to submit a pull request and we'll revie
* [Roman Atachiants](https://www.linkedin.com/in/atachiants/)
* [Yichao Wang](https://www.linkedin.com/in/wangyichao/)
* [Chun Rong Phang](https://www.linkedin.com/in/phang-chun-rong-6232ab78/)
* [Ankit Kumar Sinha](https://www.linkedin.com/in/ankit-kumar-sinha-805359b6/)

## License

Expand Down
6 changes: 5 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,16 @@ require (
github.com/armon/go-metrics v0.3.3 // indirect
github.com/aws/aws-sdk-go v1.30.25
github.com/crphang/orc v0.0.3
github.com/dgraph-io/badger/v2 v2.0.3
github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200626160443-3042e3776798
github.com/dgraph-io/ristretto v0.0.2 // indirect
github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13 // indirect
github.com/dnaeon/go-vcr v1.0.1 // indirect
github.com/emitter-io/address v1.0.0
github.com/gogo/protobuf v1.3.1
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect
github.com/golang/protobuf v1.4.1 // indirect
github.com/golang/snappy v0.0.1
github.com/google/btree v1.0.0 // indirect
github.com/gopherjs/gopherjs v0.0.0-20200209183636-89e6cbcd0b6d // indirect
github.com/gorilla/mux v1.7.4
github.com/grab/async v0.0.5
Expand All @@ -39,6 +41,8 @@ require (
github.com/kelindar/loader v0.0.10
github.com/kelindar/lua v0.0.6
github.com/miekg/dns v1.1.29 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.1 // indirect
github.com/myteksi/hystrix-go v1.1.3
github.com/samuel/go-thrift v0.0.0-20191111193933-5165175b40af
github.com/satori/go.uuid v1.2.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@ github.com/crphang/orc v0.0.3/go.mod h1:+siY09J77eYa8+0+UXZxeyv8nrmhvOY40DbqB7gR
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgraph-io/badger/v2 v2.0.3 h1:inzdf6VF/NZ+tJ8RwwYMjJMvsOALTHYdozn0qSl6XJI=
github.com/dgraph-io/badger/v2 v2.0.3/go.mod h1:3KY8+bsP8wI0OEnQJAKpd4wIJW/Mm32yw2j/9FUVnIM=
github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200626160443-3042e3776798 h1:PcjL/wjye6pw6IZK1SEKqp+wPpBm+YFx8gO7lBFArW8=
github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200626160443-3042e3776798/go.mod h1:3KY8+bsP8wI0OEnQJAKpd4wIJW/Mm32yw2j/9FUVnIM=
github.com/dgraph-io/ristretto v0.0.2-0.20200115201040-8f368f2f2ab3 h1:MQLRM35Pp0yAyBYksjbj1nZI/w6eyRY/mWoM1sFf4kU=
github.com/dgraph-io/ristretto v0.0.2-0.20200115201040-8f368f2f2ab3/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70dGTu2u+Ahh6E=
github.com/dgraph-io/ristretto v0.0.2 h1:a5WaUrDa0qm0YrAAS1tUykT5El3kt62KNZZeMxQn3po=
Expand Down
21 changes: 21 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,14 @@ import (
"github.com/kelindar/talaria/internal/encoding/typeof"
)

type BadgerDefault string

const (
BadgerStorage BadgerDefault = "storage"
BadgerIngestion BadgerDefault = "ingestion"
BadgerDefaultOption BadgerDefault = "default"
)

// Config global
type Config struct {
URI string `json:"uri" yaml:"uri" env:"URI"`
Expand All @@ -23,6 +31,19 @@ type Config struct {
Statsd *StatsD `json:"statsd,omitempty" yaml:"statsd" env:"STATSD"`
Computed []Computed `json:"computed" yaml:"computed" env:"COMPUTED"`
K8s *K8s `json:"k8s,omitempty" yaml:"k8s" env:"K8S"`
Badger Badger `json:"badger,omitempty" yaml:"badger" env:"BADGER"`
}

// This configuration is related to the badger K-V store that we use underlying.
// This will help to tune the options to optimize for various uses cases like bigger files, point query or range queries.
type Badger struct {
SyncWrites *bool `json:"syncWrites" yaml:"syncWrites" env:"SYNCWRITES"` // Whether to sync writes to disk before ack. defaults to true
ValueLogMaxEntries *uint32 `json:"valueLogMaxEntries" yaml:"valueLogMaxEntries" env:"VALUELOGMAXENTRIES"` // Maximum number of entries a value log file can hold approximately. defaults to 5000
MaxTableSize *int64 `json:"maxTableSize" yaml:"maxTableSize" env:"MAXTABLESIZE"` // Maximum size in bytes for each LSM table or file.
LevelOneSize *int64 `json:"levelOneSize" yaml:"levelOneSize" env:"LEVELONESIZE"` // Maximum total size in bytes for Level 1. defaults to 1 Million
LevelSizeMultiplier *int `json:"levelSizeMultiplier" yaml:"levelSizeMultiplier" env:"LEVELSIZEMULTIPLIER"` // The ratio between the maximum sizes of contiguous levels in the LSM. defaults to 10
MaxLevels *int `json:"maxLevels" yaml:"maxLevels" env:"MAXLEVELS"` // Maximum number of levels of compaction allowed in the LSM. defaults to 7
Default BadgerDefault `json:"default" yaml:"default" env:"DEFAULT"` // default badger option to optimize for storage, ingestion or default that badger provides
}

type K8s struct {
Expand Down
1 change: 1 addition & 0 deletions internal/server/server_ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,6 @@ func (s *Server) Ingest(ctx context.Context, request *talaria.IngestRequest) (*t
}
}

s.monitor.Count("server", "ingestCount", int64(len(blocks)))
return nil, nil
}
10 changes: 8 additions & 2 deletions internal/storage/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func (s *Storage) Delete(keys ...key.Key) error {

// Compact runs the compaction on the storage
func (s *Storage) Compact(ctx context.Context) (interface{}, error) {
st := time.Now()
var hash uint32
var blocks []block.Block
var merged []key.Key
Expand Down Expand Up @@ -134,7 +135,9 @@ func (s *Storage) Compact(ctx context.Context) (interface{}, error) {

// Wait for the pool to be close
close(queue)
return wpool.Outcome()
out, err := wpool.Outcome()
s.monitor.Histogram(ctxTag, "compactlatency", float64(time.Since(st)))
return out, err
}

// merge adds an key-value pair to the underlying database
Expand Down Expand Up @@ -163,11 +166,14 @@ func (s *Storage) merge(keys []key.Key, blocks []block.Block, schema typeof.Sche
}
}

start := time.Now()
// Delete all of the keys that we have appended
if err = s.buffer.Delete(keys...); err != nil {
s.monitor.Count1(ctxTag, "error", "type:delete")
s.monitor.Error(err)
s.monitor.Error(errors.Internal("merge error %s", err))
}
s.monitor.Histogram(ctxTag, "deletelatency", float64(time.Since(start)))
s.monitor.Count(ctxTag, "deleteCount", int64(len(keys)))
return
})
}
Expand Down
7 changes: 6 additions & 1 deletion internal/storage/compact/compact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"testing"
"time"

"github.com/kelindar/talaria/internal/config"

"github.com/kelindar/talaria/internal/encoding/block"
"github.com/kelindar/talaria/internal/encoding/key"
"github.com/kelindar/talaria/internal/encoding/typeof"
Expand Down Expand Up @@ -60,7 +62,10 @@ func runTest(t *testing.T, test func(store *disk.Storage)) {
func run(f func(store *disk.Storage)) {
dir, _ := ioutil.TempDir("", "test")
store := disk.New(monitor.NewNoop())
_ = store.Open(dir)
syncWrite := false
_ = store.Open(dir, config.Badger{
SyncWrites: &syncWrite,
})

// Close once we're done and delete data
defer func() { _ = os.RemoveAll(dir) }()
Expand Down
70 changes: 58 additions & 12 deletions internal/storage/disk/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/dgraph-io/badger/v2"
"github.com/grab/async"
"github.com/kelindar/talaria/internal/config"
"github.com/kelindar/talaria/internal/encoding/key"
"github.com/kelindar/talaria/internal/monitor"
"github.com/kelindar/talaria/internal/monitor/errors"
Expand Down Expand Up @@ -46,18 +47,18 @@ func New(m monitor.Monitor) *Storage {
}

// Open creates a disk storage and open the directory
func Open(dir string, name string, monitor monitor.Monitor) *Storage {
func Open(dir string, name string, monitor monitor.Monitor, options config.Badger) *Storage {
diskStorage := New(monitor)
tableDir := path.Join(dir, name)
err := diskStorage.Open(tableDir)
err := diskStorage.Open(tableDir, options)
if err != nil {
panic(err)
}
return diskStorage
}

// Open opens a directory.
func (s *Storage) Open(dir string) error {
func (s *Storage) Open(dir string, options config.Badger) error {

// Default to a /data directory
if dir == "" {
Expand All @@ -69,16 +70,61 @@ func (s *Storage) Open(dir string) error {
return err
}

// Create the options
opts := badger.DefaultOptions(dir)
opts.SyncWrites = false
opts.MaxTableSize = 64 << 15
opts.ValueLogMaxEntries = 5000
opts.LevelOneSize = 1 << 16
opts.LevelSizeMultiplier = 3
opts.MaxLevels = 25
opts.Truncate = true
opts.Logger = &logger{s.monitor}

switch options.Default {
case config.BadgerStorage:
opts = opts.WithMaxLevels(64 << 15).WithValueLogMaxEntries(5000).WithLevelOneSize(1 << 16).WithLevelSizeMultiplier(3).WithMaxLevels(25).WithSyncWrites(false)
case config.BadgerIngestion:
opts = opts.WithLevelOneSize(204800000).WithMaxLevels(3).WithSyncWrites(false)
}

// Create the options
if options.SyncWrites != nil {
opts = opts.WithSyncWrites(*options.SyncWrites)
}

// max size of lsm tree in bytes after which data is propagated to disk.
// The default is 64 MB.
if options.MaxTableSize != nil {
opts = opts.WithMaxTableSize(*options.MaxTableSize)
}

// ValueLogMaxEntries sets the maximum number of entries a value log file can hold approximately.
// A actual size limit of a value log file is the minimum of ValueLogFileSize and
// ValueLogMaxEntries.
// The default value of ValueLogMaxEntries is 1 Million
if options.ValueLogMaxEntries != nil {
opts = opts.WithValueLogMaxEntries(*options.ValueLogMaxEntries)
}

// LevelOneSize sets the maximum total size for Level 1.
// The default value of LevelOneSize is 20MB.
if options.LevelOneSize != nil {
opts = opts.WithLevelOneSize(*options.LevelOneSize)
}

// Maximum number of levels of compaction allowed in the LSM.
// The default value of MaxLevels is 7.
if options.MaxLevels != nil {
opts = opts.WithMaxLevels(*options.MaxLevels)
}

// LevelSizeMultiplier sets the ratio between the maximum sizes of contiguous levels in the LSM.
// Once a level grows to be larger than this ratio allowed, the compaction process will be
// triggered.
// The default value of LevelSizeMultiplier is 10.
if options.LevelSizeMultiplier != nil {
opts = opts.WithLevelSizeMultiplier(*options.LevelSizeMultiplier)
}

// Truncate indicates whether value log files should be truncated to delete corrupt data, if any.
// This option is ignored when ReadOnly is true.
// The default value of Truncate is false.
opts = opts.WithTruncate(true)

opts = opts.WithLogger(&logger{s.monitor})
s.monitor.Info("opening badger with options %+v", opts)

// Attempt to open the database
db, err := badger.Open(opts)
Expand Down
9 changes: 7 additions & 2 deletions internal/storage/disk/disk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"testing"
"time"

"github.com/kelindar/talaria/internal/config"

"github.com/kelindar/talaria/internal/encoding/key"
"github.com/kelindar/talaria/internal/monitor"
"github.com/stretchr/testify/assert"
Expand All @@ -28,7 +30,7 @@ func runTest(t *testing.T, test func(store *Storage)) {
func run(f func(store *Storage)) {
dir, _ := ioutil.TempDir("", "test")
store := New(monitor.NewNoop())
_ = store.Open(dir)
_ = store.Open(dir, config.Badger{})

// Close once we're done and delete data
defer func() { _ = os.RemoveAll(dir) }()
Expand Down Expand Up @@ -250,7 +252,10 @@ func populate(store *Storage) {

func TestOpen(t *testing.T) {
assert.NotPanicsf(t, func() {
disk := Open(".", "test-table", monitor.NewNoop())
syncWrites := false
disk := Open(".", "test-table", monitor.NewNoop(), config.Badger{
SyncWrites: &syncWrites,
})
assert.NotNil(t, disk)
disk.Close()
os.RemoveAll("test-table")
Expand Down
3 changes: 2 additions & 1 deletion internal/storage/flush/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package flush

import (
"bytes"
"compress/flate"
"sync"
"time"

Expand Down Expand Up @@ -68,7 +69,7 @@ func (s *Storage) Merge(blocks []block.Block, schema typeof.Schema) ([]byte, []b
buffer := s.memoryPool.Get().(*bytes.Buffer)
writer, err := eorc.NewWriter(buffer,
eorc.SetSchema(orcSchema),
eorc.SetCompression(eorc.CompressionSnappy{}))
eorc.SetCompression(eorc.CompressionZlib{Level: flate.DefaultCompression}))

for _, blk := range blocks {
rows, err := blk.Select(blk.Schema())
Expand Down
5 changes: 3 additions & 2 deletions internal/storage/flush/flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package flush

import (
"bytes"
"compress/flate"
"io/ioutil"
"testing"

Expand Down Expand Up @@ -74,7 +75,7 @@ func TestMerge(t *testing.T) {
orcBuffer := &bytes.Buffer{}
writer, _ = eorc.NewWriter(orcBuffer,
eorc.SetSchema(orcSchema),
eorc.SetCompression(eorc.CompressionSnappy{}))
eorc.SetCompression(eorc.CompressionZlib{Level: flate.DefaultCompression}))
_ = writer.Write("eventName", 1, 1.0)
_ = writer.Write("eventName", 2, 2.0)
_ = writer.Close()
Expand Down Expand Up @@ -152,7 +153,7 @@ func TestMerge_DifferentSchema(t *testing.T) {
orcBuffer := &bytes.Buffer{}
writer, _ = eorc.NewWriter(orcBuffer,
eorc.SetSchema(orcSchema2),
eorc.SetCompression(eorc.CompressionSnappy{}))
eorc.SetCompression(eorc.CompressionZlib{Level: flate.DefaultCompression}))
_ = writer.Write("eventName", 1, 1.0, nil)
_ = writer.Write("eventName", 2, 2.0, "s")
_ = writer.Close()
Expand Down
2 changes: 1 addition & 1 deletion internal/table/log/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type Table struct {

// New creates a new table implementation.
func New(cfg config.Func, cluster Membership, monitor monitor.Monitor) *Table {
store := disk.Open(cfg().Storage.Directory, cfg().Tables.Log.Name, monitor)
store := disk.Open(cfg().Storage.Directory, cfg().Tables.Log.Name, monitor, cfg().Badger)
base := timeseries.New(cluster, monitor, store, timeseries.Config{
Name: cfg().Tables.Log.Name,
TTL: cfg().Tables.Log.TTL,
Expand Down
14 changes: 7 additions & 7 deletions internal/table/timeseries/timeseries_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
package timeseries_test

import (
"io/ioutil"
"os"
"testing"

"github.com/kelindar/talaria/internal/config"
"github.com/kelindar/talaria/internal/encoding/block"
"github.com/kelindar/talaria/internal/encoding/typeof"
Expand All @@ -12,9 +16,6 @@ import (
"github.com/kelindar/talaria/internal/storage/disk"
"github.com/kelindar/talaria/internal/table/timeseries"
"github.com/stretchr/testify/assert"
"io/ioutil"
"os"
"testing"
)

const testFile2 = "../../../test/test2.orc"
Expand Down Expand Up @@ -56,7 +57,7 @@ func TestTimeseries_DynamicSchema(t *testing.T) {
}

monitor := monitor2.NewNoop()
store := disk.Open(dir, timeseriesCfg.Name, monitor)
store := disk.Open(dir, timeseriesCfg.Name, monitor, config.Badger{})

// Start the server and open the database
eventlog := timeseries.New(new(noopMembership), monitor, store, timeseriesCfg)
Expand Down Expand Up @@ -147,7 +148,7 @@ int1: int64
}

monitor := monitor2.NewNoop()
store := disk.Open(dir, timeseriesCfg.Name, monitor)
store := disk.Open(dir, timeseriesCfg.Name, monitor, config.Badger{})

// Start the server and open the database
eventlog := timeseries.New(new(noopMembership), monitor, store, timeseriesCfg)
Expand All @@ -157,12 +158,11 @@ int1: int64
assert.Nil(t, err)
expectedSchema := typeof.Schema{
"string1": typeof.String,
"int1": typeof.Int64,
"int1": typeof.Int64,
}
assert.Equal(t, expectedSchema, actualSchema)
}


func newSplitQuery(eventName, colName string) *presto.PrestoThriftTupleDomain {
return &presto.PrestoThriftTupleDomain{
Domains: map[string]*presto.PrestoThriftDomain{
Expand Down
Loading

0 comments on commit 4bd46c7

Please sign in to comment.