Skip to content

Commit

Permalink
Introduce datadog store and persist more informations when using s3 s…
Browse files Browse the repository at this point in the history
…tore.
  • Loading branch information
Mathieu Leduc-Hamel committed Mar 7, 2021
1 parent d299115 commit e066c1d
Show file tree
Hide file tree
Showing 9 changed files with 99 additions and 40 deletions.
7 changes: 6 additions & 1 deletion pkg/app/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,12 @@ func (s *Store) Run(ctx context.Context) error {
return fmt.Errorf("Error while unmarshaling value (%s): %w", s.value, err)
}

err = s.store.Persist(context.Background(), time.Now().Unix(), s.kind, s.key, result)
err = s.store.Persist(context.Background(), &store.Data{
Timestamp: time.Now().Unix(),
Name: s.kind,
ID: s.key,
Value: result,
})
if err != nil {
return err
}
Expand Down
13 changes: 4 additions & 9 deletions pkg/config/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,13 @@ import (
"strconv"
"time"

"github.com/DataDog/datadog-go/statsd"
"github.com/rs/zerolog"
)

// Config is the main configuration structure
type Config struct {
httpPort int
logger *zerolog.Logger
statsd *statsd.Client
}

func NewConfig() *Config {
Expand All @@ -26,16 +24,9 @@ func NewConfig() *Config {
panic(err)
}

statsd, err := statsd.New("127.0.0.1:8125", statsd.WithNamespace("trieugene."))

if err != nil {
panic(err)
}

return &Config{
httpPort: httpPort,
logger: &logger,
statsd: statsd,
}
}

Expand Down Expand Up @@ -97,3 +88,7 @@ func (c *Config) S3URL() string {
func (c *Config) S3Region() string {
return GetEnv("TRIEUGENE_S3_REGION", "us-east-1")
}

func (c *Config) StatsdURL() string {
return GetEnv("TRIEUGENE_STATSD_URL", "127.0.0.1:8125")
}
21 changes: 16 additions & 5 deletions pkg/jobs/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package jobs

import (
"context"

"github.com/mlhamel/trieugene/pkg/store"
)

type Job interface {
Expand All @@ -16,9 +18,18 @@ type Manager interface {
}

type Message struct {
ProcessedAt int64 `json:"processed_at" mapstructure:"processed_at"`
HappenedAt int64 `json:"happened_at" mapstructure:"happened_at"`
ID string `json:"id"`
Kind string `json:"kind"`
Data string `json:"data"`
ProcessedAt int64 `json:"processed_at" mapstructure:"processed_at"`
HappenedAt int64 `json:"happened_at" mapstructure:"happened_at"`
ID string `json:"id"`
Kind string `json:"kind"`
Value interface{} `json:"value"`
}

func (m *Message) Data() *store.Data {
return &store.Data{
Timestamp: m.HappenedAt,
ID: m.ID,
Name: m.Kind,
Value: m.Value,
}
}
2 changes: 1 addition & 1 deletion pkg/jobs/storejob.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (r *StoreJob) Perform(ctx context.Context, args ...interface{}) error {
}

r.cfg.Logger().Debug().Str("id", msg.ID).Int64("HappenedAt", msg.HappenedAt).Msg("Persisting data")
if err := r.store.Persist(ctx, msg.HappenedAt, msg.Kind, msg.ID, msg.Data); err != nil {
if err := r.store.Persist(ctx, msg.Data()); err != nil {
r.cfg.Logger().Error().Err(err).Msg("Error while trying to persist data")
return err
}
Expand Down
47 changes: 47 additions & 0 deletions pkg/store/datadog.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package store

import (
"context"
"errors"
"fmt"

"github.com/DataDog/datadog-go/statsd"
"github.com/mlhamel/trieugene/pkg/config"
)

type Datadog struct {
cfg *config.Config
statsd *statsd.Client
}

var ErrInvalidValue = errors.Unwrap(fmt.Errorf("Invalid value for datadog"))

func NewDatadog(cfg *config.Config) Store {
return &Datadog{cfg: cfg}
}

func (d *Datadog) Setup(ctx context.Context) error {
statsd, err := statsd.New(d.cfg.StatsdURL(), statsd.WithNamespace("trieugene."))

if err != nil {
return err
}

d.statsd = statsd

return nil
}

func (d *Datadog) Persist(ctx context.Context, data *Data) error {
key := fmt.Sprintf("%s-%d", data.Name, data.Timestamp)
valueFloat, ok := data.Value.(float64)
if !ok {
return ErrInvalidValue
}
err := d.statsd.Gauge(key, valueFloat, []string{}, 1.0)
if err != nil {
return err
}

return nil
}
4 changes: 2 additions & 2 deletions pkg/store/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ func (g *GoogleCloudStorage) Setup(ctx context.Context) error {
return g.bucket.Create(ctx, g.cfg.ProjectID(), nil)
}

func (g *GoogleCloudStorage) Persist(ctx context.Context, timestamp int64, name string, id string, data interface{}) error {
func (g *GoogleCloudStorage) Persist(ctx context.Context, data *Data) error {
g.cfg.Logger().Debug().Msgf("Store/GoogleCloudStorage/Persist: Start")
fileName := buildKey(name, timestamp, id)
fileName := buildKey(data.Name, data.Timestamp, data.ID)
object := g.bucket.Object(fileName)
w := object.NewWriter(ctx)
reader := bytes.NewReader([]byte(fmt.Sprintf("%v", data)))
Expand Down
11 changes: 9 additions & 2 deletions pkg/store/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,14 @@ import (
"context"
)

type Data struct {
Timestamp int64 `json:"timestamp"`
ID string `json:"id"`
Name string `json:"name"`
Value interface{} `json:"value"`
}

type Store interface {
Setup(context.Context) error
Persist(context.Context, int64, string, string, interface{}) error
Setup(ctx context.Context) error
Persist(context.Context, *Data) error
}
17 changes: 12 additions & 5 deletions pkg/store/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package store

import (
"context"
"encoding/json"
"fmt"
"strings"
"time"
Expand Down Expand Up @@ -68,15 +69,21 @@ func (s *S3) Setup(ctx context.Context) error {
return nil
}

func (s *S3) Persist(ctx context.Context, timestamp int64, name string, id string, data interface{}) error {
datetime := time.Unix(timestamp, 0)
key := fmt.Sprintf("%s/%s/%s.json", name, datetime.Format("20060102"), datetime.Format("1504"))
func (s *S3) Persist(ctx context.Context, data *Data) error {
datetime := time.Unix(data.Timestamp, 0)
key := fmt.Sprintf("%s/%s/%s.json", data.Name, datetime.Format("20060102"), datetime.Format("1504"))
body, err := json.Marshal(data)

if err != nil {
s.cfg.Logger().Error().Err(err).Str("key", key).Msg("Failed marshaling data for persistence")
return err
}

s.cfg.Logger().Debug().Str("key", key).Msg("Starting Persistence")
_, err := s.client.PutObjectWithContext(ctx, &s3.PutObjectInput{
_, err = s.client.PutObjectWithContext(ctx, &s3.PutObjectInput{
Bucket: aws.String(s.cfg.S3Bucket()),
Key: aws.String(key),
Body: strings.NewReader(fmt.Sprintf("%v", data)),
Body: strings.NewReader(fmt.Sprintf("%v", body)),
GrantRead: aws.String("GrantRead"),
ContentType: aws.String("application/json"),
})
Expand Down
17 changes: 2 additions & 15 deletions services/rougecombien/pkg/jobs/overflowjob.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package jobs

import (
"context"
"encoding/json"

"github.com/mlhamel/trieugene/pkg/config"
trieugene "github.com/mlhamel/trieugene/pkg/jobs"
Expand All @@ -15,12 +14,6 @@ type OverflowJob struct {
storejob trieugene.Job
}

type data struct {
Kind string `json:"kind"`
Overflow float64 `json:"overflow"`
HappenedAt int64 `json:"happened_at"`
}

func NewOverflowjob(cfg *config.Config, manager trieugene.Manager, storejob trieugene.Job) trieugene.Job {
return &OverflowJob{
cfg: cfg,
Expand All @@ -34,21 +27,15 @@ func (o *OverflowJob) Kind() string {
}

func (o *OverflowJob) Perform(ctx context.Context, args ...interface{}) error {
scraper.NewScraper(o.cfg, func(ctx context.Context, result scraper.Result) error {
bytes, err := json.Marshal(data{Kind: o.Kind(), Overflow: result.Outflow, HappenedAt: result.TakenAt.Unix()})
if err != nil {
return err
}

return scraper.NewScraper(o.cfg, func(ctx context.Context, result scraper.Result) error {
return o.manager.Perform(o.storejob, &trieugene.Message{
ID: result.Sha1(),
Kind: o.storejob.Kind(),
ProcessedAt: result.ScrapedAt.Unix(),
HappenedAt: result.TakenAt.Unix(),
Data: string(bytes),
Value: result.Outflow,
})
}).Run(ctx)
return nil
}

func (o *OverflowJob) Run(ctx context.Context, args ...interface{}) error {
Expand Down

0 comments on commit e066c1d

Please sign in to comment.