Skip to content

Commit

Permalink
Added better initialisation logs (#17)
Browse files Browse the repository at this point in the history
  • Loading branch information
kelindar committed Apr 29, 2020
1 parent 03920ba commit 108e2a3
Show file tree
Hide file tree
Showing 9 changed files with 37 additions and 32 deletions.
3 changes: 1 addition & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ require (
github.com/hashicorp/golang-lru v0.5.3 // indirect
github.com/hashicorp/memberlist v0.1.5
github.com/kelindar/binary v1.0.8
github.com/kelindar/loader v0.0.9
github.com/kelindar/loader v0.0.10
github.com/kelindar/lua v0.0.5
github.com/miekg/dns v1.1.22 // indirect
github.com/myteksi/hystrix-go v1.1.3
Expand All @@ -39,7 +39,6 @@ require (
github.com/stretchr/objx v0.2.0 // indirect
github.com/stretchr/testify v1.4.0
github.com/twmb/murmur3 v1.1.3
golang.org/x/net v0.0.0-20200222125558-5a598a2470a0
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e
google.golang.org/grpc v1.27.1
gopkg.in/yaml.v2 v2.2.4
Expand Down
6 changes: 2 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -194,10 +194,8 @@ github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/kelindar/binary v1.0.8 h1:YwahocVzCFY+1+WF5JUqwrYZpe4vcNo7JhWKZA/uBQs=
github.com/kelindar/binary v1.0.8/go.mod h1:4zDwr5pQvY3i4xrRd1kC7pcuWvSU/Jbh/v2D0tZUPfE=
github.com/kelindar/loader v0.0.8 h1:vQgNvzIeNF+dXzOeWlOcXkn/wxVv7GKU7CrYyHuY8TI=
github.com/kelindar/loader v0.0.8/go.mod h1:zhlDtwZnZ2I0Pq5tYScI+VWvvhItHGGJT5tLsAY1RQs=
github.com/kelindar/loader v0.0.9 h1:sbSsyVDcRUVqkom2fyRGnkmH8ejr3NVDR2PIINp6VCA=
github.com/kelindar/loader v0.0.9/go.mod h1:zhlDtwZnZ2I0Pq5tYScI+VWvvhItHGGJT5tLsAY1RQs=
github.com/kelindar/loader v0.0.10 h1:af6iHps0yk20BnRnYAOu+hLarZubIgEaZUcbyHCWJE0=
github.com/kelindar/loader v0.0.10/go.mod h1:zhlDtwZnZ2I0Pq5tYScI+VWvvhItHGGJT5tLsAY1RQs=
github.com/kelindar/lua v0.0.5 h1:VqJQGl74ub7fiZcSygHV4aZw/IbFKXA6vzk4pEUu4mk=
github.com/kelindar/lua v0.0.5/go.mod h1:zCY2muj80Y5c3odwn4Ql6kjzpk3GAq9V4JwuVGLOsdA=
github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00=
Expand Down
2 changes: 0 additions & 2 deletions internal/config/env/configurer.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@ func populate(config interface{}, pre string) {
reflectType := reflect.TypeOf(config).Elem()
reflectValue := reflect.ValueOf(config).Elem()

println(pre, reflectType.String())

for i := 0; i < reflectType.NumField(); i++ {
field := reflectValue.Field(i)
name, tagged := reflectType.Field(i).Tag.Lookup("env")
Expand Down
2 changes: 1 addition & 1 deletion internal/config/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
package config

import (
"context"
"log"
"sync/atomic"
"time"

"github.com/grab/async"
"golang.org/x/net/context"
)

// store stores the config and reloads it after 5 seconds
Expand Down
7 changes: 5 additions & 2 deletions internal/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func New(conf config.Func, monitor monitor.Monitor, loader *script.Loader, table
continue
}

monitor.Info("loaded computed column %v of type %v", c.Name, c.Type)
monitor.Info("server: loaded computed column %v of type %v", c.Name, c.Type)
server.computed = append(server.computed, col)
}

Expand All @@ -73,6 +73,7 @@ func New(conf config.Func, monitor monitor.Monitor, loader *script.Loader, table

// Build a registry of tables
for _, table := range tables {
monitor.Info("server: registered %s table...", table.Name())
server.tables[table.Name()] = table
}
return server
Expand Down Expand Up @@ -101,6 +102,7 @@ func (s *Server) Listen(ctx context.Context, prestoPort, grpcPort int32) error {

// Asynchronously start the gRPC listener
async.Invoke(ctx, func(ctx context.Context) (interface{}, error) {
s.monitor.Info("server: listening for grpc on :%d...", grpcPort)
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", grpcPort))
defer lis.Close()
if err != nil {
Expand All @@ -114,6 +116,7 @@ func (s *Server) Listen(ctx context.Context, prestoPort, grpcPort int32) error {
})

// Serve presto and block
s.monitor.Info("server: listening for thrift on :%d...", grpcPort)
return presto.Serve(ctx, int32(prestoPort), &thriftlog.Service{
Service: s,
Monitor: s.monitor,
Expand All @@ -133,7 +136,7 @@ func (s *Server) pollFromSQS(conf *config.Config) (err error) {
}

// Start ingesting
s.monitor.Info("starting ingestion from S3/SQS")
s.monitor.Info("server: starting ingestion from S3/SQS...")
s.s3sqs.Range(func(v []byte) bool {
if _, err := s.Ingest(context.Background(), &talaria.IngestRequest{
Data: &talaria.IngestRequest_Orc{Orc: v},
Expand Down
1 change: 1 addition & 0 deletions internal/storage/writer/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func New(config *config.Compaction, monitor monitor.Monitor, store storage.Stora
}
}

monitor.Info("setting up compaction writer %T to run every %.0fs...", writer, interval.Seconds())
flusher := flush.New(monitor, writer, nameFunc)
return compact.New(store, flusher, flusher, monitor, interval)
}
Expand Down
4 changes: 3 additions & 1 deletion internal/storage/writer/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (

"github.com/kelindar/talaria/internal/config"
"github.com/kelindar/talaria/internal/monitor"
"github.com/kelindar/talaria/internal/monitor/logging"
"github.com/kelindar/talaria/internal/monitor/statsd"
"github.com/kelindar/talaria/internal/scripting"
"github.com/kelindar/talaria/internal/storage/disk"
"github.com/stretchr/testify/assert"
Expand All @@ -13,7 +15,7 @@ import (
func TestNew(t *testing.T) {
cfg := &config.Compaction{}
compact := New(cfg,
monitor.NewNoop(),
monitor.New(logging.NewStandard(), statsd.NewNoop(), "x", "x"),
disk.New(monitor.NewNoop()),
script.NewLoader(nil),
)
Expand Down
40 changes: 21 additions & 19 deletions internal/table/timeseries/timeseries.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"sync/atomic"
"time"

"github.com/kelindar/loader"
"github.com/kelindar/talaria/internal/column"
"github.com/kelindar/talaria/internal/encoding/block"
"github.com/kelindar/talaria/internal/encoding/key"
Expand All @@ -22,7 +23,6 @@ import (
"github.com/kelindar/talaria/internal/presto"
"github.com/kelindar/talaria/internal/storage"
"github.com/kelindar/talaria/internal/table"
"github.com/kelindar/loader"
)

const (
Expand All @@ -40,16 +40,16 @@ type Membership interface {

// Table represents a timeseries table.
type Table struct {
name string // The name of the table
keyColumn string // The name of the key column
timeColumn string // The name of the time column
ttl time.Duration // The default TTL
store storage.Storage // The storage to use
schema atomic.Value // The latest schema
loader *loader.Loader // The loader used to watch schema updates
cluster Membership // The membership list to use
monitor monitor.Monitor // The monitoring client
staticSchema *typeof.Schema // The static schema of the timeseries table
name string // The name of the table
keyColumn string // The name of the key column
timeColumn string // The name of the time column
ttl time.Duration // The default TTL
store storage.Storage // The storage to use
schema atomic.Value // The latest schema
loader *loader.Loader // The loader used to watch schema updates
cluster Membership // The membership list to use
monitor monitor.Monitor // The monitoring client
staticSchema *typeof.Schema // The static schema of the timeseries table
}

// Config represents the configuration of the storage
Expand All @@ -65,14 +65,14 @@ type Config struct {
func New(cluster Membership, monitor monitor.Monitor, store storage.Storage, cfg Config) *Table {
// Load Schema From Config
t := &Table{
name: cfg.Name,
store: store,
keyColumn: cfg.HashBy,
timeColumn: cfg.SortBy,
ttl: time.Duration(cfg.TTL) * time.Second,
cluster: cluster,
monitor: monitor,
loader: loader.New(),
name: cfg.Name,
store: store,
keyColumn: cfg.HashBy,
timeColumn: cfg.SortBy,
ttl: time.Duration(cfg.TTL) * time.Second,
cluster: cluster,
monitor: monitor,
loader: loader.New(),
}

t.staticSchema = t.loadStaticSchema(cfg.Schema)
Expand Down Expand Up @@ -110,11 +110,13 @@ func (t *Table) loadStaticSchema(uriOrSchema string) *typeof.Schema {
// Start watching on the URL
updates := t.loader.Watch(context.Background(), uriOrSchema, 5*time.Minute)
u := <-updates

// Check if given uri has error
if u.Err != nil {
t.monitor.Warning(errors.Internal("error reading from uri", u.Err))
return nil
}

// Check if given schema is malformed
err := yaml.Unmarshal(u.Data, &staticSchema)
if err != nil {
Expand Down
4 changes: 3 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func main() {
})

// Create a storage, if compact store is enabled then use the compact store
monitor.Info("server: opening data directory %s...", conf.Storage.Directory)
store := storage.Storage(disk.Open(conf.Storage.Directory, conf.Tables.Timeseries.Name, monitor))
if conf.Storage.Compact != nil {
store = writer.New(conf.Storage.Compact, monitor, store, loader)
Expand All @@ -92,10 +93,11 @@ func main() {
})

// Join the cluster
monitor.Info("server: joining cluster on %s...", conf.Domain)
gossip.JoinHostname(conf.Domain)

// Start listen
monitor.Info("starting talaria server")
monitor.Info("server: starting...")
monitor.Count1(logTag, "start")
if err := server.Listen(ctx, conf.Readers.Presto.Port, conf.Writers.GRPC.Port); err != nil {
panic(err)
Expand Down

0 comments on commit 108e2a3

Please sign in to comment.