diff --git a/go.mod b/go.mod index 05ccad3e..b671af5c 100644 --- a/go.mod +++ b/go.mod @@ -29,7 +29,7 @@ require ( github.com/hashicorp/golang-lru v0.5.3 // indirect github.com/hashicorp/memberlist v0.1.5 github.com/kelindar/binary v1.0.8 - github.com/kelindar/loader v0.0.9 + github.com/kelindar/loader v0.0.10 github.com/kelindar/lua v0.0.5 github.com/miekg/dns v1.1.22 // indirect github.com/myteksi/hystrix-go v1.1.3 @@ -39,7 +39,6 @@ require ( github.com/stretchr/objx v0.2.0 // indirect github.com/stretchr/testify v1.4.0 github.com/twmb/murmur3 v1.1.3 - golang.org/x/net v0.0.0-20200222125558-5a598a2470a0 golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e google.golang.org/grpc v1.27.1 gopkg.in/yaml.v2 v2.2.4 diff --git a/go.sum b/go.sum index 1510b9bb..e233029d 100644 --- a/go.sum +++ b/go.sum @@ -194,10 +194,8 @@ github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7 github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= github.com/kelindar/binary v1.0.8 h1:YwahocVzCFY+1+WF5JUqwrYZpe4vcNo7JhWKZA/uBQs= github.com/kelindar/binary v1.0.8/go.mod h1:4zDwr5pQvY3i4xrRd1kC7pcuWvSU/Jbh/v2D0tZUPfE= -github.com/kelindar/loader v0.0.8 h1:vQgNvzIeNF+dXzOeWlOcXkn/wxVv7GKU7CrYyHuY8TI= -github.com/kelindar/loader v0.0.8/go.mod h1:zhlDtwZnZ2I0Pq5tYScI+VWvvhItHGGJT5tLsAY1RQs= -github.com/kelindar/loader v0.0.9 h1:sbSsyVDcRUVqkom2fyRGnkmH8ejr3NVDR2PIINp6VCA= -github.com/kelindar/loader v0.0.9/go.mod h1:zhlDtwZnZ2I0Pq5tYScI+VWvvhItHGGJT5tLsAY1RQs= +github.com/kelindar/loader v0.0.10 h1:af6iHps0yk20BnRnYAOu+hLarZubIgEaZUcbyHCWJE0= +github.com/kelindar/loader v0.0.10/go.mod h1:zhlDtwZnZ2I0Pq5tYScI+VWvvhItHGGJT5tLsAY1RQs= github.com/kelindar/lua v0.0.5 h1:VqJQGl74ub7fiZcSygHV4aZw/IbFKXA6vzk4pEUu4mk= github.com/kelindar/lua v0.0.5/go.mod h1:zCY2muj80Y5c3odwn4Ql6kjzpk3GAq9V4JwuVGLOsdA= github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00= diff --git a/internal/config/env/configurer.go b/internal/config/env/configurer.go index 799f4527..0d46731e 100644 --- a/internal/config/env/configurer.go +++ b/internal/config/env/configurer.go @@ -51,8 +51,6 @@ func populate(config interface{}, pre string) { reflectType := reflect.TypeOf(config).Elem() reflectValue := reflect.ValueOf(config).Elem() - println(pre, reflectType.String()) - for i := 0; i < reflectType.NumField(); i++ { field := reflectValue.Field(i) name, tagged := reflectType.Field(i).Tag.Lookup("env") diff --git a/internal/config/store.go b/internal/config/store.go index fba38589..40e21aea 100644 --- a/internal/config/store.go +++ b/internal/config/store.go @@ -4,12 +4,12 @@ package config import ( + "context" "log" "sync/atomic" "time" "github.com/grab/async" - "golang.org/x/net/context" ) // store stores the config and reloads it after 5 seconds diff --git a/internal/server/server.go b/internal/server/server.go index e9be6fc6..b293f2a8 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -63,7 +63,7 @@ func New(conf config.Func, monitor monitor.Monitor, loader *script.Loader, table continue } - monitor.Info("loaded computed column %v of type %v", c.Name, c.Type) + monitor.Info("server: loaded computed column %v of type %v", c.Name, c.Type) server.computed = append(server.computed, col) } @@ -73,6 +73,7 @@ func New(conf config.Func, monitor monitor.Monitor, loader *script.Loader, table // Build a registry of tables for _, table := range tables { + monitor.Info("server: registered %s table...", table.Name()) server.tables[table.Name()] = table } return server @@ -101,6 +102,7 @@ func (s *Server) Listen(ctx context.Context, prestoPort, grpcPort int32) error { // Asynchronously start the gRPC listener async.Invoke(ctx, func(ctx context.Context) (interface{}, error) { + s.monitor.Info("server: listening for grpc on :%d...", grpcPort) lis, err := net.Listen("tcp", fmt.Sprintf(":%d", grpcPort)) defer lis.Close() if err != nil { @@ -114,6 +116,7 @@ func (s *Server) Listen(ctx context.Context, prestoPort, grpcPort int32) error { }) // Serve presto and block + s.monitor.Info("server: listening for thrift on :%d...", grpcPort) return presto.Serve(ctx, int32(prestoPort), &thriftlog.Service{ Service: s, Monitor: s.monitor, @@ -133,7 +136,7 @@ func (s *Server) pollFromSQS(conf *config.Config) (err error) { } // Start ingesting - s.monitor.Info("starting ingestion from S3/SQS") + s.monitor.Info("server: starting ingestion from S3/SQS...") s.s3sqs.Range(func(v []byte) bool { if _, err := s.Ingest(context.Background(), &talaria.IngestRequest{ Data: &talaria.IngestRequest_Orc{Orc: v}, diff --git a/internal/storage/writer/writer.go b/internal/storage/writer/writer.go index 92f6c475..0759fdce 100644 --- a/internal/storage/writer/writer.go +++ b/internal/storage/writer/writer.go @@ -55,6 +55,7 @@ func New(config *config.Compaction, monitor monitor.Monitor, store storage.Stora } } + monitor.Info("setting up compaction writer %T to run every %.0fs...", writer, interval.Seconds()) flusher := flush.New(monitor, writer, nameFunc) return compact.New(store, flusher, flusher, monitor, interval) } diff --git a/internal/storage/writer/writer_test.go b/internal/storage/writer/writer_test.go index cfcea589..fccfa229 100644 --- a/internal/storage/writer/writer_test.go +++ b/internal/storage/writer/writer_test.go @@ -5,6 +5,8 @@ import ( "github.com/kelindar/talaria/internal/config" "github.com/kelindar/talaria/internal/monitor" + "github.com/kelindar/talaria/internal/monitor/logging" + "github.com/kelindar/talaria/internal/monitor/statsd" "github.com/kelindar/talaria/internal/scripting" "github.com/kelindar/talaria/internal/storage/disk" "github.com/stretchr/testify/assert" @@ -13,7 +15,7 @@ import ( func TestNew(t *testing.T) { cfg := &config.Compaction{} compact := New(cfg, - monitor.NewNoop(), + monitor.New(logging.NewStandard(), statsd.NewNoop(), "x", "x"), disk.New(monitor.NewNoop()), script.NewLoader(nil), ) diff --git a/internal/table/timeseries/timeseries.go b/internal/table/timeseries/timeseries.go index 804af728..98d3ac23 100644 --- a/internal/table/timeseries/timeseries.go +++ b/internal/table/timeseries/timeseries.go @@ -13,6 +13,7 @@ import ( "sync/atomic" "time" + "github.com/kelindar/loader" "github.com/kelindar/talaria/internal/column" "github.com/kelindar/talaria/internal/encoding/block" "github.com/kelindar/talaria/internal/encoding/key" @@ -22,7 +23,6 @@ import ( "github.com/kelindar/talaria/internal/presto" "github.com/kelindar/talaria/internal/storage" "github.com/kelindar/talaria/internal/table" - "github.com/kelindar/loader" ) const ( @@ -40,16 +40,16 @@ type Membership interface { // Table represents a timeseries table. type Table struct { - name string // The name of the table - keyColumn string // The name of the key column - timeColumn string // The name of the time column - ttl time.Duration // The default TTL - store storage.Storage // The storage to use - schema atomic.Value // The latest schema - loader *loader.Loader // The loader used to watch schema updates - cluster Membership // The membership list to use - monitor monitor.Monitor // The monitoring client - staticSchema *typeof.Schema // The static schema of the timeseries table + name string // The name of the table + keyColumn string // The name of the key column + timeColumn string // The name of the time column + ttl time.Duration // The default TTL + store storage.Storage // The storage to use + schema atomic.Value // The latest schema + loader *loader.Loader // The loader used to watch schema updates + cluster Membership // The membership list to use + monitor monitor.Monitor // The monitoring client + staticSchema *typeof.Schema // The static schema of the timeseries table } // Config represents the configuration of the storage @@ -65,14 +65,14 @@ type Config struct { func New(cluster Membership, monitor monitor.Monitor, store storage.Storage, cfg Config) *Table { // Load Schema From Config t := &Table{ - name: cfg.Name, - store: store, - keyColumn: cfg.HashBy, - timeColumn: cfg.SortBy, - ttl: time.Duration(cfg.TTL) * time.Second, - cluster: cluster, - monitor: monitor, - loader: loader.New(), + name: cfg.Name, + store: store, + keyColumn: cfg.HashBy, + timeColumn: cfg.SortBy, + ttl: time.Duration(cfg.TTL) * time.Second, + cluster: cluster, + monitor: monitor, + loader: loader.New(), } t.staticSchema = t.loadStaticSchema(cfg.Schema) @@ -110,11 +110,13 @@ func (t *Table) loadStaticSchema(uriOrSchema string) *typeof.Schema { // Start watching on the URL updates := t.loader.Watch(context.Background(), uriOrSchema, 5*time.Minute) u := <-updates + // Check if given uri has error if u.Err != nil { t.monitor.Warning(errors.Internal("error reading from uri", u.Err)) return nil } + // Check if given schema is malformed err := yaml.Unmarshal(u.Data, &staticSchema) if err != nil { diff --git a/main.go b/main.go index 64d3eded..70fee024 100644 --- a/main.go +++ b/main.go @@ -66,6 +66,7 @@ func main() { }) // Create a storage, if compact store is enabled then use the compact store + monitor.Info("server: opening data directory %s...", conf.Storage.Directory) store := storage.Storage(disk.Open(conf.Storage.Directory, conf.Tables.Timeseries.Name, monitor)) if conf.Storage.Compact != nil { store = writer.New(conf.Storage.Compact, monitor, store, loader) @@ -92,10 +93,11 @@ func main() { }) // Join the cluster + monitor.Info("server: joining cluster on %s...", conf.Domain) gossip.JoinHostname(conf.Domain) // Start listen - monitor.Info("starting talaria server") + monitor.Info("server: starting...") monitor.Count1(logTag, "start") if err := server.Listen(ctx, conf.Readers.Presto.Port, conf.Writers.GRPC.Port); err != nil { panic(err)