Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update compression method and expose badger options #31

Merged
merged 47 commits into from
Jul 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
018f5ef
revert changes for compression
a9kitkumarsinha Jun 11, 2020
d4f6301
Add a compact before close
a9kitkumarsinha Jun 11, 2020
737992b
add badger metrics
a9kitkumarsinha Jun 16, 2020
3dca77a
add stats for compaction and ingestion
a9kitkumarsinha Jun 16, 2020
e8f9b14
use default badger options
a9kitkumarsinha Jun 17, 2020
cff8a7e
use badger default options
a9kitkumarsinha Jun 17, 2020
a3b6524
use default badger options
a9kitkumarsinha Jun 17, 2020
2a175ec
add latency for delete
a9kitkumarsinha Jun 17, 2020
803c323
testing
a9kitkumarsinha Jun 17, 2020
466a8cd
fix typo
a9kitkumarsinha Jun 17, 2020
9ae72c1
fix typo
a9kitkumarsinha Jun 17, 2020
7ae9fef
fix typo
a9kitkumarsinha Jun 17, 2020
563ac41
fix typo
a9kitkumarsinha Jun 17, 2020
f5830fd
fix typo
a9kitkumarsinha Jun 17, 2020
67c2a0b
fix typo
a9kitkumarsinha Jun 17, 2020
11d60c3
fix typo
a9kitkumarsinha Jun 18, 2020
2f7d467
fix typo
a9kitkumarsinha Jun 20, 2020
b35a6e7
fix typo
a9kitkumarsinha Jun 20, 2020
75c05ad
Add profiler
crphang Jun 23, 2020
8e26727
Add a compact before close
a9kitkumarsinha Jun 24, 2020
c47e9d6
Merge remote-tracking branch 'origin/revert-compaction' into revert-c…
a9kitkumarsinha Jun 24, 2020
b4bec07
Add a compact before close
a9kitkumarsinha Jun 24, 2020
88159d1
Add a compact before close
a9kitkumarsinha Jun 24, 2020
b9e4138
Add a compact before close
a9kitkumarsinha Jun 24, 2020
dc5b161
Add pprof handlers
crphang Jun 24, 2020
9245a4c
Add a compact before close
a9kitkumarsinha Jun 24, 2020
04bbc0d
Merge remote-tracking branch 'origin/revert-compaction' into revert-c…
a9kitkumarsinha Jun 24, 2020
92a61ec
Fix pprof setup
crphang Jun 24, 2020
36b66b1
Merge branch 'revert-compaction' of github.com:kelindar/talaria into …
crphang Jun 24, 2020
d4e8841
Add a compact before close
a9kitkumarsinha Jun 24, 2020
eb9eb69
Merge remote-tracking branch 'origin/revert-compaction' into revert-c…
a9kitkumarsinha Jun 24, 2020
cca5ce0
Add a compact before close
a9kitkumarsinha Jun 24, 2020
834ca70
Add a compact before close
a9kitkumarsinha Jun 25, 2020
a2c4484
Add a compact before close
a9kitkumarsinha Jun 25, 2020
6356b25
Add a compact before close
a9kitkumarsinha Jun 25, 2020
1deca50
Add a compact before close
a9kitkumarsinha Jun 26, 2020
875b6df
Add a compact before close
a9kitkumarsinha Jun 26, 2020
929d7ba
use master badger and default options
a9kitkumarsinha Jun 27, 2020
931241a
Add badger options in the config
a9kitkumarsinha Jun 29, 2020
0581ae9
go mod tidy
a9kitkumarsinha Jun 29, 2020
3fcba9f
improve disk options logging on init
a9kitkumarsinha Jun 29, 2020
f99c07d
improve disk options logging on init
a9kitkumarsinha Jun 29, 2020
89d40c9
fix the compact test
a9kitkumarsinha Jun 29, 2020
a12b63a
fix the compact test
a9kitkumarsinha Jun 29, 2020
d0c9a02
add default options for badger
a9kitkumarsinha Jul 1, 2020
c4fab21
add default options for badger
a9kitkumarsinha Jul 1, 2020
c731e25
add ankit to list of contributors
a9kitkumarsinha Jul 1, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ We are open to contributions, feel free to submit a pull request and we'll revie
* [Roman Atachiants](https://www.linkedin.com/in/atachiants/)
* [Yichao Wang](https://www.linkedin.com/in/wangyichao/)
* [Chun Rong Phang](https://www.linkedin.com/in/phang-chun-rong-6232ab78/)
* [Ankit Kumar Sinha](https://www.linkedin.com/in/ankit-kumar-sinha-805359b6/)

## License

Expand Down
6 changes: 5 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,16 @@ require (
github.com/armon/go-metrics v0.3.3 // indirect
github.com/aws/aws-sdk-go v1.30.25
github.com/crphang/orc v0.0.3
github.com/dgraph-io/badger/v2 v2.0.3
github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200626160443-3042e3776798
github.com/dgraph-io/ristretto v0.0.2 // indirect
github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13 // indirect
github.com/dnaeon/go-vcr v1.0.1 // indirect
github.com/emitter-io/address v1.0.0
github.com/gogo/protobuf v1.3.1
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect
github.com/golang/protobuf v1.4.1 // indirect
github.com/golang/snappy v0.0.1
github.com/google/btree v1.0.0 // indirect
github.com/gopherjs/gopherjs v0.0.0-20200209183636-89e6cbcd0b6d // indirect
github.com/gorilla/mux v1.7.4
github.com/grab/async v0.0.5
Expand All @@ -39,6 +41,8 @@ require (
github.com/kelindar/loader v0.0.10
github.com/kelindar/lua v0.0.6
github.com/miekg/dns v1.1.29 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.1 // indirect
github.com/myteksi/hystrix-go v1.1.3
github.com/samuel/go-thrift v0.0.0-20191111193933-5165175b40af
github.com/satori/go.uuid v1.2.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@ github.com/crphang/orc v0.0.3/go.mod h1:+siY09J77eYa8+0+UXZxeyv8nrmhvOY40DbqB7gR
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgraph-io/badger/v2 v2.0.3 h1:inzdf6VF/NZ+tJ8RwwYMjJMvsOALTHYdozn0qSl6XJI=
github.com/dgraph-io/badger/v2 v2.0.3/go.mod h1:3KY8+bsP8wI0OEnQJAKpd4wIJW/Mm32yw2j/9FUVnIM=
github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200626160443-3042e3776798 h1:PcjL/wjye6pw6IZK1SEKqp+wPpBm+YFx8gO7lBFArW8=
github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200626160443-3042e3776798/go.mod h1:3KY8+bsP8wI0OEnQJAKpd4wIJW/Mm32yw2j/9FUVnIM=
github.com/dgraph-io/ristretto v0.0.2-0.20200115201040-8f368f2f2ab3 h1:MQLRM35Pp0yAyBYksjbj1nZI/w6eyRY/mWoM1sFf4kU=
github.com/dgraph-io/ristretto v0.0.2-0.20200115201040-8f368f2f2ab3/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70dGTu2u+Ahh6E=
github.com/dgraph-io/ristretto v0.0.2 h1:a5WaUrDa0qm0YrAAS1tUykT5El3kt62KNZZeMxQn3po=
Expand Down
21 changes: 21 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,14 @@ import (
"github.com/kelindar/talaria/internal/encoding/typeof"
)

type BadgerDefault string

const (
BadgerStorage BadgerDefault = "storage"
BadgerIngestion BadgerDefault = "ingestion"
BadgerDefaultOption BadgerDefault = "default"
)

// Config global
type Config struct {
URI string `json:"uri" yaml:"uri" env:"URI"`
Expand All @@ -23,6 +31,19 @@ type Config struct {
Statsd *StatsD `json:"statsd,omitempty" yaml:"statsd" env:"STATSD"`
Computed []Computed `json:"computed" yaml:"computed" env:"COMPUTED"`
K8s *K8s `json:"k8s,omitempty" yaml:"k8s" env:"K8S"`
Badger Badger `json:"badger,omitempty" yaml:"badger" env:"BADGER"`
}

// This configuration is related to the badger K-V store that we use underlying.
// This will help to tune the options to optimize for various uses cases like bigger files, point query or range queries.
type Badger struct {
a9kitkumarsinha marked this conversation as resolved.
Show resolved Hide resolved
SyncWrites *bool `json:"syncWrites" yaml:"syncWrites" env:"SYNCWRITES"` // Whether to sync writes to disk before ack. defaults to true
ValueLogMaxEntries *uint32 `json:"valueLogMaxEntries" yaml:"valueLogMaxEntries" env:"VALUELOGMAXENTRIES"` // Maximum number of entries a value log file can hold approximately. defaults to 5000
MaxTableSize *int64 `json:"maxTableSize" yaml:"maxTableSize" env:"MAXTABLESIZE"` // Maximum size in bytes for each LSM table or file.
LevelOneSize *int64 `json:"levelOneSize" yaml:"levelOneSize" env:"LEVELONESIZE"` // Maximum total size in bytes for Level 1. defaults to 1 Million
LevelSizeMultiplier *int `json:"levelSizeMultiplier" yaml:"levelSizeMultiplier" env:"LEVELSIZEMULTIPLIER"` // The ratio between the maximum sizes of contiguous levels in the LSM. defaults to 10
MaxLevels *int `json:"maxLevels" yaml:"maxLevels" env:"MAXLEVELS"` // Maximum number of levels of compaction allowed in the LSM. defaults to 7
Default BadgerDefault `json:"default" yaml:"default" env:"DEFAULT"` // default badger option to optimize for storage, ingestion or default that badger provides
}

type K8s struct {
Expand Down
1 change: 1 addition & 0 deletions internal/server/server_ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,6 @@ func (s *Server) Ingest(ctx context.Context, request *talaria.IngestRequest) (*t
}
}

s.monitor.Count("server", "ingestCount", int64(len(blocks)))
return nil, nil
}
10 changes: 8 additions & 2 deletions internal/storage/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func (s *Storage) Delete(keys ...key.Key) error {

// Compact runs the compaction on the storage
func (s *Storage) Compact(ctx context.Context) (interface{}, error) {
st := time.Now()
var hash uint32
var blocks []block.Block
var merged []key.Key
Expand Down Expand Up @@ -134,7 +135,9 @@ func (s *Storage) Compact(ctx context.Context) (interface{}, error) {

// Wait for the pool to be close
close(queue)
return wpool.Outcome()
out, err := wpool.Outcome()
s.monitor.Histogram(ctxTag, "compactlatency", float64(time.Since(st)))
a9kitkumarsinha marked this conversation as resolved.
Show resolved Hide resolved
return out, err
}

// merge adds an key-value pair to the underlying database
Expand Down Expand Up @@ -163,11 +166,14 @@ func (s *Storage) merge(keys []key.Key, blocks []block.Block, schema typeof.Sche
}
}

start := time.Now()
// Delete all of the keys that we have appended
if err = s.buffer.Delete(keys...); err != nil {
s.monitor.Count1(ctxTag, "error", "type:delete")
s.monitor.Error(err)
s.monitor.Error(errors.Internal("merge error %s", err))
}
s.monitor.Histogram(ctxTag, "deletelatency", float64(time.Since(start)))
s.monitor.Count(ctxTag, "deleteCount", int64(len(keys)))
return
})
}
Expand Down
7 changes: 6 additions & 1 deletion internal/storage/compact/compact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"testing"
"time"

"github.com/kelindar/talaria/internal/config"

"github.com/kelindar/talaria/internal/encoding/block"
"github.com/kelindar/talaria/internal/encoding/key"
"github.com/kelindar/talaria/internal/encoding/typeof"
Expand Down Expand Up @@ -60,7 +62,10 @@ func runTest(t *testing.T, test func(store *disk.Storage)) {
func run(f func(store *disk.Storage)) {
dir, _ := ioutil.TempDir("", "test")
store := disk.New(monitor.NewNoop())
_ = store.Open(dir)
syncWrite := false
_ = store.Open(dir, config.Badger{
SyncWrites: &syncWrite,
})

// Close once we're done and delete data
defer func() { _ = os.RemoveAll(dir) }()
Expand Down
70 changes: 58 additions & 12 deletions internal/storage/disk/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/dgraph-io/badger/v2"
"github.com/grab/async"
"github.com/kelindar/talaria/internal/config"
"github.com/kelindar/talaria/internal/encoding/key"
"github.com/kelindar/talaria/internal/monitor"
"github.com/kelindar/talaria/internal/monitor/errors"
Expand Down Expand Up @@ -46,18 +47,18 @@ func New(m monitor.Monitor) *Storage {
}

// Open creates a disk storage and open the directory
func Open(dir string, name string, monitor monitor.Monitor) *Storage {
func Open(dir string, name string, monitor monitor.Monitor, options config.Badger) *Storage {
diskStorage := New(monitor)
tableDir := path.Join(dir, name)
err := diskStorage.Open(tableDir)
err := diskStorage.Open(tableDir, options)
if err != nil {
panic(err)
}
return diskStorage
}

// Open opens a directory.
func (s *Storage) Open(dir string) error {
func (s *Storage) Open(dir string, options config.Badger) error {

// Default to a /data directory
if dir == "" {
Expand All @@ -69,16 +70,61 @@ func (s *Storage) Open(dir string) error {
return err
}

// Create the options
opts := badger.DefaultOptions(dir)
opts.SyncWrites = false
opts.MaxTableSize = 64 << 15
opts.ValueLogMaxEntries = 5000
opts.LevelOneSize = 1 << 16
opts.LevelSizeMultiplier = 3
opts.MaxLevels = 25
opts.Truncate = true
opts.Logger = &logger{s.monitor}

switch options.Default {
case config.BadgerStorage:
opts = opts.WithMaxLevels(64 << 15).WithValueLogMaxEntries(5000).WithLevelOneSize(1 << 16).WithLevelSizeMultiplier(3).WithMaxLevels(25).WithSyncWrites(false)
case config.BadgerIngestion:
opts = opts.WithLevelOneSize(204800000).WithMaxLevels(3).WithSyncWrites(false)
}

// Create the options
if options.SyncWrites != nil {
opts = opts.WithSyncWrites(*options.SyncWrites)
}

// max size of lsm tree in bytes after which data is propagated to disk.
// The default is 64 MB.
if options.MaxTableSize != nil {
opts = opts.WithMaxTableSize(*options.MaxTableSize)
}

// ValueLogMaxEntries sets the maximum number of entries a value log file can hold approximately.
// A actual size limit of a value log file is the minimum of ValueLogFileSize and
// ValueLogMaxEntries.
// The default value of ValueLogMaxEntries is 1 Million
if options.ValueLogMaxEntries != nil {
opts = opts.WithValueLogMaxEntries(*options.ValueLogMaxEntries)
}

// LevelOneSize sets the maximum total size for Level 1.
// The default value of LevelOneSize is 20MB.
if options.LevelOneSize != nil {
opts = opts.WithLevelOneSize(*options.LevelOneSize)
}

// Maximum number of levels of compaction allowed in the LSM.
// The default value of MaxLevels is 7.
if options.MaxLevels != nil {
opts = opts.WithMaxLevels(*options.MaxLevels)
}

// LevelSizeMultiplier sets the ratio between the maximum sizes of contiguous levels in the LSM.
// Once a level grows to be larger than this ratio allowed, the compaction process will be
// triggered.
// The default value of LevelSizeMultiplier is 10.
if options.LevelSizeMultiplier != nil {
opts = opts.WithLevelSizeMultiplier(*options.LevelSizeMultiplier)
}

// Truncate indicates whether value log files should be truncated to delete corrupt data, if any.
// This option is ignored when ReadOnly is true.
// The default value of Truncate is false.
opts = opts.WithTruncate(true)

opts = opts.WithLogger(&logger{s.monitor})
s.monitor.Info("opening badger with options %+v", opts)

// Attempt to open the database
db, err := badger.Open(opts)
Expand Down
9 changes: 7 additions & 2 deletions internal/storage/disk/disk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"testing"
"time"

"github.com/kelindar/talaria/internal/config"

"github.com/kelindar/talaria/internal/encoding/key"
"github.com/kelindar/talaria/internal/monitor"
"github.com/stretchr/testify/assert"
Expand All @@ -28,7 +30,7 @@ func runTest(t *testing.T, test func(store *Storage)) {
func run(f func(store *Storage)) {
dir, _ := ioutil.TempDir("", "test")
store := New(monitor.NewNoop())
_ = store.Open(dir)
_ = store.Open(dir, config.Badger{})

// Close once we're done and delete data
defer func() { _ = os.RemoveAll(dir) }()
Expand Down Expand Up @@ -250,7 +252,10 @@ func populate(store *Storage) {

func TestOpen(t *testing.T) {
assert.NotPanicsf(t, func() {
disk := Open(".", "test-table", monitor.NewNoop())
syncWrites := false
disk := Open(".", "test-table", monitor.NewNoop(), config.Badger{
SyncWrites: &syncWrites,
})
assert.NotNil(t, disk)
disk.Close()
os.RemoveAll("test-table")
Expand Down
3 changes: 2 additions & 1 deletion internal/storage/flush/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package flush

import (
"bytes"
"compress/flate"
"sync"
"time"

Expand Down Expand Up @@ -68,7 +69,7 @@ func (s *Storage) Merge(blocks []block.Block, schema typeof.Schema) ([]byte, []b
buffer := s.memoryPool.Get().(*bytes.Buffer)
writer, err := eorc.NewWriter(buffer,
eorc.SetSchema(orcSchema),
eorc.SetCompression(eorc.CompressionSnappy{}))
eorc.SetCompression(eorc.CompressionZlib{Level: flate.DefaultCompression}))

for _, blk := range blocks {
rows, err := blk.Select(blk.Schema())
Expand Down
5 changes: 3 additions & 2 deletions internal/storage/flush/flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package flush

import (
"bytes"
"compress/flate"
"io/ioutil"
"testing"

Expand Down Expand Up @@ -74,7 +75,7 @@ func TestMerge(t *testing.T) {
orcBuffer := &bytes.Buffer{}
writer, _ = eorc.NewWriter(orcBuffer,
eorc.SetSchema(orcSchema),
eorc.SetCompression(eorc.CompressionSnappy{}))
eorc.SetCompression(eorc.CompressionZlib{Level: flate.DefaultCompression}))
_ = writer.Write("eventName", 1, 1.0)
_ = writer.Write("eventName", 2, 2.0)
_ = writer.Close()
Expand Down Expand Up @@ -152,7 +153,7 @@ func TestMerge_DifferentSchema(t *testing.T) {
orcBuffer := &bytes.Buffer{}
writer, _ = eorc.NewWriter(orcBuffer,
eorc.SetSchema(orcSchema2),
eorc.SetCompression(eorc.CompressionSnappy{}))
eorc.SetCompression(eorc.CompressionZlib{Level: flate.DefaultCompression}))
_ = writer.Write("eventName", 1, 1.0, nil)
_ = writer.Write("eventName", 2, 2.0, "s")
_ = writer.Close()
Expand Down
2 changes: 1 addition & 1 deletion internal/table/log/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type Table struct {

// New creates a new table implementation.
func New(cfg config.Func, cluster Membership, monitor monitor.Monitor) *Table {
store := disk.Open(cfg().Storage.Directory, cfg().Tables.Log.Name, monitor)
store := disk.Open(cfg().Storage.Directory, cfg().Tables.Log.Name, monitor, cfg().Badger)
base := timeseries.New(cluster, monitor, store, timeseries.Config{
Name: cfg().Tables.Log.Name,
TTL: cfg().Tables.Log.TTL,
Expand Down
14 changes: 7 additions & 7 deletions internal/table/timeseries/timeseries_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
package timeseries_test

import (
"io/ioutil"
"os"
"testing"

"github.com/kelindar/talaria/internal/config"
"github.com/kelindar/talaria/internal/encoding/block"
"github.com/kelindar/talaria/internal/encoding/typeof"
Expand All @@ -12,9 +16,6 @@ import (
"github.com/kelindar/talaria/internal/storage/disk"
"github.com/kelindar/talaria/internal/table/timeseries"
"github.com/stretchr/testify/assert"
"io/ioutil"
"os"
"testing"
)

const testFile2 = "../../../test/test2.orc"
Expand Down Expand Up @@ -56,7 +57,7 @@ func TestTimeseries_DynamicSchema(t *testing.T) {
}

monitor := monitor2.NewNoop()
store := disk.Open(dir, timeseriesCfg.Name, monitor)
store := disk.Open(dir, timeseriesCfg.Name, monitor, config.Badger{})

// Start the server and open the database
eventlog := timeseries.New(new(noopMembership), monitor, store, timeseriesCfg)
Expand Down Expand Up @@ -147,7 +148,7 @@ int1: int64
}

monitor := monitor2.NewNoop()
store := disk.Open(dir, timeseriesCfg.Name, monitor)
store := disk.Open(dir, timeseriesCfg.Name, monitor, config.Badger{})

// Start the server and open the database
eventlog := timeseries.New(new(noopMembership), monitor, store, timeseriesCfg)
Expand All @@ -157,12 +158,11 @@ int1: int64
assert.Nil(t, err)
expectedSchema := typeof.Schema{
"string1": typeof.String,
"int1": typeof.Int64,
"int1": typeof.Int64,
}
assert.Equal(t, expectedSchema, actualSchema)
}


func newSplitQuery(eventName, colName string) *presto.PrestoThriftTupleDomain {
return &presto.PrestoThriftTupleDomain{
Domains: map[string]*presto.PrestoThriftDomain{
Expand Down
Loading