diff --git a/README.md b/README.md index 65c4aa21..3a93a383 100644 --- a/README.md +++ b/README.md @@ -139,6 +139,7 @@ We are open to contributions, feel free to submit a pull request and we'll revie * [Roman Atachiants](https://www.linkedin.com/in/atachiants/) * [Yichao Wang](https://www.linkedin.com/in/wangyichao/) * [Chun Rong Phang](https://www.linkedin.com/in/phang-chun-rong-6232ab78/) +* [Ankit Kumar Sinha](https://www.linkedin.com/in/ankit-kumar-sinha-805359b6/) ## License diff --git a/go.mod b/go.mod index a16847a8..3488ac16 100644 --- a/go.mod +++ b/go.mod @@ -16,14 +16,16 @@ require ( github.com/armon/go-metrics v0.3.3 // indirect github.com/aws/aws-sdk-go v1.30.25 github.com/crphang/orc v0.0.3 - github.com/dgraph-io/badger/v2 v2.0.3 + github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200626160443-3042e3776798 github.com/dgraph-io/ristretto v0.0.2 // indirect github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13 // indirect github.com/dnaeon/go-vcr v1.0.1 // indirect github.com/emitter-io/address v1.0.0 github.com/gogo/protobuf v1.3.1 + github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect github.com/golang/protobuf v1.4.1 // indirect github.com/golang/snappy v0.0.1 + github.com/google/btree v1.0.0 // indirect github.com/gopherjs/gopherjs v0.0.0-20200209183636-89e6cbcd0b6d // indirect github.com/gorilla/mux v1.7.4 github.com/grab/async v0.0.5 @@ -39,6 +41,8 @@ require ( github.com/kelindar/loader v0.0.10 github.com/kelindar/lua v0.0.6 github.com/miekg/dns v1.1.29 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.1 // indirect github.com/myteksi/hystrix-go v1.1.3 github.com/samuel/go-thrift v0.0.0-20191111193933-5165175b40af github.com/satori/go.uuid v1.2.0 // indirect diff --git a/go.sum b/go.sum index 416f6dee..d0613a45 100644 --- a/go.sum +++ b/go.sum @@ -114,8 +114,8 @@ github.com/crphang/orc v0.0.3/go.mod h1:+siY09J77eYa8+0+UXZxeyv8nrmhvOY40DbqB7gR github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/dgraph-io/badger/v2 v2.0.3 h1:inzdf6VF/NZ+tJ8RwwYMjJMvsOALTHYdozn0qSl6XJI= -github.com/dgraph-io/badger/v2 v2.0.3/go.mod h1:3KY8+bsP8wI0OEnQJAKpd4wIJW/Mm32yw2j/9FUVnIM= +github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200626160443-3042e3776798 h1:PcjL/wjye6pw6IZK1SEKqp+wPpBm+YFx8gO7lBFArW8= +github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200626160443-3042e3776798/go.mod h1:3KY8+bsP8wI0OEnQJAKpd4wIJW/Mm32yw2j/9FUVnIM= github.com/dgraph-io/ristretto v0.0.2-0.20200115201040-8f368f2f2ab3 h1:MQLRM35Pp0yAyBYksjbj1nZI/w6eyRY/mWoM1sFf4kU= github.com/dgraph-io/ristretto v0.0.2-0.20200115201040-8f368f2f2ab3/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70dGTu2u+Ahh6E= github.com/dgraph-io/ristretto v0.0.2 h1:a5WaUrDa0qm0YrAAS1tUykT5El3kt62KNZZeMxQn3po= diff --git a/internal/config/config.go b/internal/config/config.go index 529c9ae0..03adb328 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -10,6 +10,14 @@ import ( "github.com/kelindar/talaria/internal/encoding/typeof" ) +type BadgerDefault string + +const ( + BadgerStorage BadgerDefault = "storage" + BadgerIngestion BadgerDefault = "ingestion" + BadgerDefaultOption BadgerDefault = "default" +) + // Config global type Config struct { URI string `json:"uri" yaml:"uri" env:"URI"` @@ -23,6 +31,19 @@ type Config struct { Statsd *StatsD `json:"statsd,omitempty" yaml:"statsd" env:"STATSD"` Computed []Computed `json:"computed" yaml:"computed" env:"COMPUTED"` K8s *K8s `json:"k8s,omitempty" yaml:"k8s" env:"K8S"` + Badger Badger `json:"badger,omitempty" yaml:"badger" env:"BADGER"` +} + +// This configuration is related to the badger K-V store that we use underlying. +// This will help to tune the options to optimize for various uses cases like bigger files, point query or range queries. +type Badger struct { + SyncWrites *bool `json:"syncWrites" yaml:"syncWrites" env:"SYNCWRITES"` // Whether to sync writes to disk before ack. defaults to true + ValueLogMaxEntries *uint32 `json:"valueLogMaxEntries" yaml:"valueLogMaxEntries" env:"VALUELOGMAXENTRIES"` // Maximum number of entries a value log file can hold approximately. defaults to 5000 + MaxTableSize *int64 `json:"maxTableSize" yaml:"maxTableSize" env:"MAXTABLESIZE"` // Maximum size in bytes for each LSM table or file. + LevelOneSize *int64 `json:"levelOneSize" yaml:"levelOneSize" env:"LEVELONESIZE"` // Maximum total size in bytes for Level 1. defaults to 1 Million + LevelSizeMultiplier *int `json:"levelSizeMultiplier" yaml:"levelSizeMultiplier" env:"LEVELSIZEMULTIPLIER"` // The ratio between the maximum sizes of contiguous levels in the LSM. defaults to 10 + MaxLevels *int `json:"maxLevels" yaml:"maxLevels" env:"MAXLEVELS"` // Maximum number of levels of compaction allowed in the LSM. defaults to 7 + Default BadgerDefault `json:"default" yaml:"default" env:"DEFAULT"` // default badger option to optimize for storage, ingestion or default that badger provides } type K8s struct { diff --git a/internal/server/server_ingest.go b/internal/server/server_ingest.go index 8ce447de..435c49da 100644 --- a/internal/server/server_ingest.go +++ b/internal/server/server_ingest.go @@ -40,5 +40,6 @@ func (s *Server) Ingest(ctx context.Context, request *talaria.IngestRequest) (*t } } + s.monitor.Count("server", "ingestCount", int64(len(blocks))) return nil, nil } diff --git a/internal/storage/compact/compact.go b/internal/storage/compact/compact.go index 54e4a33e..b70a474c 100644 --- a/internal/storage/compact/compact.go +++ b/internal/storage/compact/compact.go @@ -82,6 +82,7 @@ func (s *Storage) Delete(keys ...key.Key) error { // Compact runs the compaction on the storage func (s *Storage) Compact(ctx context.Context) (interface{}, error) { + st := time.Now() var hash uint32 var blocks []block.Block var merged []key.Key @@ -134,7 +135,9 @@ func (s *Storage) Compact(ctx context.Context) (interface{}, error) { // Wait for the pool to be close close(queue) - return wpool.Outcome() + out, err := wpool.Outcome() + s.monitor.Histogram(ctxTag, "compactlatency", float64(time.Since(st))) + return out, err } // merge adds an key-value pair to the underlying database @@ -163,11 +166,14 @@ func (s *Storage) merge(keys []key.Key, blocks []block.Block, schema typeof.Sche } } + start := time.Now() // Delete all of the keys that we have appended if err = s.buffer.Delete(keys...); err != nil { s.monitor.Count1(ctxTag, "error", "type:delete") - s.monitor.Error(err) + s.monitor.Error(errors.Internal("merge error %s", err)) } + s.monitor.Histogram(ctxTag, "deletelatency", float64(time.Since(start))) + s.monitor.Count(ctxTag, "deleteCount", int64(len(keys))) return }) } diff --git a/internal/storage/compact/compact_test.go b/internal/storage/compact/compact_test.go index 77c487b6..8acaa8b1 100644 --- a/internal/storage/compact/compact_test.go +++ b/internal/storage/compact/compact_test.go @@ -11,6 +11,8 @@ import ( "testing" "time" + "github.com/kelindar/talaria/internal/config" + "github.com/kelindar/talaria/internal/encoding/block" "github.com/kelindar/talaria/internal/encoding/key" "github.com/kelindar/talaria/internal/encoding/typeof" @@ -60,7 +62,10 @@ func runTest(t *testing.T, test func(store *disk.Storage)) { func run(f func(store *disk.Storage)) { dir, _ := ioutil.TempDir("", "test") store := disk.New(monitor.NewNoop()) - _ = store.Open(dir) + syncWrite := false + _ = store.Open(dir, config.Badger{ + SyncWrites: &syncWrite, + }) // Close once we're done and delete data defer func() { _ = os.RemoveAll(dir) }() diff --git a/internal/storage/disk/disk.go b/internal/storage/disk/disk.go index 4caa5be5..fc27f41a 100644 --- a/internal/storage/disk/disk.go +++ b/internal/storage/disk/disk.go @@ -16,6 +16,7 @@ import ( "github.com/dgraph-io/badger/v2" "github.com/grab/async" + "github.com/kelindar/talaria/internal/config" "github.com/kelindar/talaria/internal/encoding/key" "github.com/kelindar/talaria/internal/monitor" "github.com/kelindar/talaria/internal/monitor/errors" @@ -46,10 +47,10 @@ func New(m monitor.Monitor) *Storage { } // Open creates a disk storage and open the directory -func Open(dir string, name string, monitor monitor.Monitor) *Storage { +func Open(dir string, name string, monitor monitor.Monitor, options config.Badger) *Storage { diskStorage := New(monitor) tableDir := path.Join(dir, name) - err := diskStorage.Open(tableDir) + err := diskStorage.Open(tableDir, options) if err != nil { panic(err) } @@ -57,7 +58,7 @@ func Open(dir string, name string, monitor monitor.Monitor) *Storage { } // Open opens a directory. -func (s *Storage) Open(dir string) error { +func (s *Storage) Open(dir string, options config.Badger) error { // Default to a /data directory if dir == "" { @@ -69,16 +70,61 @@ func (s *Storage) Open(dir string) error { return err } - // Create the options opts := badger.DefaultOptions(dir) - opts.SyncWrites = false - opts.MaxTableSize = 64 << 15 - opts.ValueLogMaxEntries = 5000 - opts.LevelOneSize = 1 << 16 - opts.LevelSizeMultiplier = 3 - opts.MaxLevels = 25 - opts.Truncate = true - opts.Logger = &logger{s.monitor} + + switch options.Default { + case config.BadgerStorage: + opts = opts.WithMaxLevels(64 << 15).WithValueLogMaxEntries(5000).WithLevelOneSize(1 << 16).WithLevelSizeMultiplier(3).WithMaxLevels(25).WithSyncWrites(false) + case config.BadgerIngestion: + opts = opts.WithLevelOneSize(204800000).WithMaxLevels(3).WithSyncWrites(false) + } + + // Create the options + if options.SyncWrites != nil { + opts = opts.WithSyncWrites(*options.SyncWrites) + } + + // max size of lsm tree in bytes after which data is propagated to disk. + // The default is 64 MB. + if options.MaxTableSize != nil { + opts = opts.WithMaxTableSize(*options.MaxTableSize) + } + + // ValueLogMaxEntries sets the maximum number of entries a value log file can hold approximately. + // A actual size limit of a value log file is the minimum of ValueLogFileSize and + // ValueLogMaxEntries. + // The default value of ValueLogMaxEntries is 1 Million + if options.ValueLogMaxEntries != nil { + opts = opts.WithValueLogMaxEntries(*options.ValueLogMaxEntries) + } + + // LevelOneSize sets the maximum total size for Level 1. + // The default value of LevelOneSize is 20MB. + if options.LevelOneSize != nil { + opts = opts.WithLevelOneSize(*options.LevelOneSize) + } + + // Maximum number of levels of compaction allowed in the LSM. + // The default value of MaxLevels is 7. + if options.MaxLevels != nil { + opts = opts.WithMaxLevels(*options.MaxLevels) + } + + // LevelSizeMultiplier sets the ratio between the maximum sizes of contiguous levels in the LSM. + // Once a level grows to be larger than this ratio allowed, the compaction process will be + // triggered. + // The default value of LevelSizeMultiplier is 10. + if options.LevelSizeMultiplier != nil { + opts = opts.WithLevelSizeMultiplier(*options.LevelSizeMultiplier) + } + + // Truncate indicates whether value log files should be truncated to delete corrupt data, if any. + // This option is ignored when ReadOnly is true. + // The default value of Truncate is false. + opts = opts.WithTruncate(true) + + opts = opts.WithLogger(&logger{s.monitor}) + s.monitor.Info("opening badger with options %+v", opts) // Attempt to open the database db, err := badger.Open(opts) diff --git a/internal/storage/disk/disk_test.go b/internal/storage/disk/disk_test.go index d0b1dac4..e278c327 100644 --- a/internal/storage/disk/disk_test.go +++ b/internal/storage/disk/disk_test.go @@ -12,6 +12,8 @@ import ( "testing" "time" + "github.com/kelindar/talaria/internal/config" + "github.com/kelindar/talaria/internal/encoding/key" "github.com/kelindar/talaria/internal/monitor" "github.com/stretchr/testify/assert" @@ -28,7 +30,7 @@ func runTest(t *testing.T, test func(store *Storage)) { func run(f func(store *Storage)) { dir, _ := ioutil.TempDir("", "test") store := New(monitor.NewNoop()) - _ = store.Open(dir) + _ = store.Open(dir, config.Badger{}) // Close once we're done and delete data defer func() { _ = os.RemoveAll(dir) }() @@ -250,7 +252,10 @@ func populate(store *Storage) { func TestOpen(t *testing.T) { assert.NotPanicsf(t, func() { - disk := Open(".", "test-table", monitor.NewNoop()) + syncWrites := false + disk := Open(".", "test-table", monitor.NewNoop(), config.Badger{ + SyncWrites: &syncWrites, + }) assert.NotNil(t, disk) disk.Close() os.RemoveAll("test-table") diff --git a/internal/storage/flush/flush.go b/internal/storage/flush/flush.go index b6e65fe6..f875e685 100644 --- a/internal/storage/flush/flush.go +++ b/internal/storage/flush/flush.go @@ -5,6 +5,7 @@ package flush import ( "bytes" + "compress/flate" "sync" "time" @@ -68,7 +69,7 @@ func (s *Storage) Merge(blocks []block.Block, schema typeof.Schema) ([]byte, []b buffer := s.memoryPool.Get().(*bytes.Buffer) writer, err := eorc.NewWriter(buffer, eorc.SetSchema(orcSchema), - eorc.SetCompression(eorc.CompressionSnappy{})) + eorc.SetCompression(eorc.CompressionZlib{Level: flate.DefaultCompression})) for _, blk := range blocks { rows, err := blk.Select(blk.Schema()) diff --git a/internal/storage/flush/flush_test.go b/internal/storage/flush/flush_test.go index 4521f08d..39f65de6 100644 --- a/internal/storage/flush/flush_test.go +++ b/internal/storage/flush/flush_test.go @@ -5,6 +5,7 @@ package flush import ( "bytes" + "compress/flate" "io/ioutil" "testing" @@ -74,7 +75,7 @@ func TestMerge(t *testing.T) { orcBuffer := &bytes.Buffer{} writer, _ = eorc.NewWriter(orcBuffer, eorc.SetSchema(orcSchema), - eorc.SetCompression(eorc.CompressionSnappy{})) + eorc.SetCompression(eorc.CompressionZlib{Level: flate.DefaultCompression})) _ = writer.Write("eventName", 1, 1.0) _ = writer.Write("eventName", 2, 2.0) _ = writer.Close() @@ -152,7 +153,7 @@ func TestMerge_DifferentSchema(t *testing.T) { orcBuffer := &bytes.Buffer{} writer, _ = eorc.NewWriter(orcBuffer, eorc.SetSchema(orcSchema2), - eorc.SetCompression(eorc.CompressionSnappy{})) + eorc.SetCompression(eorc.CompressionZlib{Level: flate.DefaultCompression})) _ = writer.Write("eventName", 1, 1.0, nil) _ = writer.Write("eventName", 2, 2.0, "s") _ = writer.Close() diff --git a/internal/table/log/log.go b/internal/table/log/log.go index 4e625c1b..dcb90a37 100644 --- a/internal/table/log/log.go +++ b/internal/table/log/log.go @@ -31,7 +31,7 @@ type Table struct { // New creates a new table implementation. func New(cfg config.Func, cluster Membership, monitor monitor.Monitor) *Table { - store := disk.Open(cfg().Storage.Directory, cfg().Tables.Log.Name, monitor) + store := disk.Open(cfg().Storage.Directory, cfg().Tables.Log.Name, monitor, cfg().Badger) base := timeseries.New(cluster, monitor, store, timeseries.Config{ Name: cfg().Tables.Log.Name, TTL: cfg().Tables.Log.TTL, diff --git a/internal/table/timeseries/timeseries_test.go b/internal/table/timeseries/timeseries_test.go index ea0dfb9a..4e0530db 100644 --- a/internal/table/timeseries/timeseries_test.go +++ b/internal/table/timeseries/timeseries_test.go @@ -4,6 +4,10 @@ package timeseries_test import ( + "io/ioutil" + "os" + "testing" + "github.com/kelindar/talaria/internal/config" "github.com/kelindar/talaria/internal/encoding/block" "github.com/kelindar/talaria/internal/encoding/typeof" @@ -12,9 +16,6 @@ import ( "github.com/kelindar/talaria/internal/storage/disk" "github.com/kelindar/talaria/internal/table/timeseries" "github.com/stretchr/testify/assert" - "io/ioutil" - "os" - "testing" ) const testFile2 = "../../../test/test2.orc" @@ -56,7 +57,7 @@ func TestTimeseries_DynamicSchema(t *testing.T) { } monitor := monitor2.NewNoop() - store := disk.Open(dir, timeseriesCfg.Name, monitor) + store := disk.Open(dir, timeseriesCfg.Name, monitor, config.Badger{}) // Start the server and open the database eventlog := timeseries.New(new(noopMembership), monitor, store, timeseriesCfg) @@ -147,7 +148,7 @@ int1: int64 } monitor := monitor2.NewNoop() - store := disk.Open(dir, timeseriesCfg.Name, monitor) + store := disk.Open(dir, timeseriesCfg.Name, monitor, config.Badger{}) // Start the server and open the database eventlog := timeseries.New(new(noopMembership), monitor, store, timeseriesCfg) @@ -157,12 +158,11 @@ int1: int64 assert.Nil(t, err) expectedSchema := typeof.Schema{ "string1": typeof.String, - "int1": typeof.Int64, + "int1": typeof.Int64, } assert.Equal(t, expectedSchema, actualSchema) } - func newSplitQuery(eventName, colName string) *presto.PrestoThriftTupleDomain { return &presto.PrestoThriftTupleDomain{ Domains: map[string]*presto.PrestoThriftDomain{ diff --git a/main.go b/main.go index b470d51f..b9278f78 100644 --- a/main.go +++ b/main.go @@ -7,13 +7,13 @@ import ( "context" "fmt" "net/http" + _ "net/http/pprof" "os" "os/signal" "syscall" "time" "github.com/gorilla/mux" - "github.com/kelindar/lua" "github.com/kelindar/talaria/internal/config" "github.com/kelindar/talaria/internal/config/env" @@ -22,7 +22,7 @@ import ( "github.com/kelindar/talaria/internal/monitor" "github.com/kelindar/talaria/internal/monitor/logging" "github.com/kelindar/talaria/internal/monitor/statsd" - "github.com/kelindar/talaria/internal/scripting" + script "github.com/kelindar/talaria/internal/scripting" mlog "github.com/kelindar/talaria/internal/scripting/log" mnet "github.com/kelindar/talaria/internal/scripting/net" mstats "github.com/kelindar/talaria/internal/scripting/stats" @@ -73,7 +73,7 @@ func main() { // Create a storage, if compact store is enabled then use the compact store monitor.Info("server: opening data directory %s...", conf.Storage.Directory) - store := storage.Storage(disk.Open(conf.Storage.Directory, conf.Tables.Timeseries.Name, monitor)) + store := storage.Storage(disk.Open(conf.Storage.Directory, conf.Tables.Timeseries.Name, monitor, conf.Badger)) if conf.Storage.Compact != nil { store = writer.New(conf.Storage.Compact, monitor, store, loader) } @@ -107,7 +107,7 @@ func main() { startHTTPServerAsync(conf.K8s.ProbePort) } - // Start listen + // Start listenHandler monitor.Info("server: starting...") monitor.Count1(logTag, "start") if err := server.Listen(ctx, conf.Readers.Presto.Port, conf.Writers.GRPC.Port); err != nil { @@ -132,6 +132,7 @@ func startHTTPServerAsync(portNum int32) { handler.HandleFunc("/healthz", func(resp http.ResponseWriter, req *http.Request) { _, _ = resp.Write([]byte(`talaria-health-check`)) }).Methods(http.MethodGet, http.MethodHead) + handler.PathPrefix("/debug/pprof/").Handler(http.DefaultServeMux) server := &http.Server{ Addr: fmt.Sprintf(":%d", portNum), diff --git a/test/bench_test.go b/test/bench_test.go index 1db21ceb..3c3beb1c 100644 --- a/test/bench_test.go +++ b/test/bench_test.go @@ -13,7 +13,7 @@ import ( "github.com/kelindar/talaria/internal/config" "github.com/kelindar/talaria/internal/monitor" "github.com/kelindar/talaria/internal/presto" - "github.com/kelindar/talaria/internal/scripting" + script "github.com/kelindar/talaria/internal/scripting" "github.com/kelindar/talaria/internal/server" "github.com/kelindar/talaria/internal/storage/disk" "github.com/kelindar/talaria/internal/table/timeseries" @@ -66,7 +66,7 @@ func BenchmarkQuery(b *testing.B) { SortBy: cfg().Tables.Timeseries.SortBy, Schema: "", } - store := disk.Open(cfg().Storage.Directory, timeseriesCfg.Name, monitor) + store := disk.Open(cfg().Storage.Directory, timeseriesCfg.Name, monitor, cfg().Badger) // Start the server and open the database server := server.New(cfg, monitor, script.NewLoader(nil), diff --git a/test/local.go b/test/local.go index 2d4bac93..c321e117 100644 --- a/test/local.go +++ b/test/local.go @@ -15,7 +15,7 @@ import ( "github.com/kelindar/talaria/internal/monitor" "github.com/kelindar/talaria/internal/monitor/logging" "github.com/kelindar/talaria/internal/monitor/statsd" - "github.com/kelindar/talaria/internal/scripting" + script "github.com/kelindar/talaria/internal/scripting" "github.com/kelindar/talaria/internal/server" "github.com/kelindar/talaria/internal/server/cluster" "github.com/kelindar/talaria/internal/storage/disk" @@ -65,7 +65,7 @@ func main() { gossip := cluster.New(7946) gossip.JoinHostname("localhost") - store := disk.Open(cfg().Storage.Directory, cfg().Tables.Timeseries.Name, monitor) + store := disk.Open(cfg().Storage.Directory, cfg().Tables.Timeseries.Name, monitor, config.Badger{}) // Start the server and open the database eventlog := timeseries.New(gossip, monitor, store, timeseries.Config{