Skip to content

Commit

Permalink
introduce proper csv persistence per day for rougecombien service.
Browse files Browse the repository at this point in the history
  • Loading branch information
Mathieu Leduc-Hamel committed Apr 3, 2021
1 parent f9e2292 commit 5bfd4cf
Show file tree
Hide file tree
Showing 18 changed files with 452 additions and 183 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
github.com/contribsys/faktory v1.3.0-1
github.com/contribsys/faktory_worker_go v1.4.2
github.com/go-co-op/gocron v0.6.0
github.com/gocarina/gocsv v0.0.0-20210326111627-0340a0229e98
github.com/gorilla/mux v1.8.0
github.com/mitchellh/mapstructure v1.4.1
github.com/pior/runnable v0.8.0
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
github.com/go-redis/redis v6.15.7+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA=
github.com/gocarina/gocsv v0.0.0-20201208093247-67c824bc04d4 h1:Q7s2AN3DhFJKOnzO0uTKLhJTfXTEcXcvw5ylf2BHJw4=
github.com/gocarina/gocsv v0.0.0-20201208093247-67c824bc04d4/go.mod h1:5YoVOkjYAQumqlV356Hj3xeYh4BdZuLE0/nRkf2NKkI=
github.com/gocarina/gocsv v0.0.0-20210326111627-0340a0229e98 h1:kmuCASO9n1dN4NLlOP7zx9svH8oIZiKB1tf6/rUAiyY=
github.com/gocarina/gocsv v0.0.0-20210326111627-0340a0229e98/go.mod h1:5YoVOkjYAQumqlV356Hj3xeYh4BdZuLE0/nRkf2NKkI=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
Expand Down
11 changes: 8 additions & 3 deletions pkg/app/faktory.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,15 @@ type Faktory struct {
}

func NewFaktory(cfg *config.Config, store store.Store) runnable.Runnable {
storeJob := jobs.NewStoreJob("store-rougecombien", cfg, store)
jsonStoreJob := jobs.NewJsonStoreJob("json-store-rougecombien", cfg, store)
csvStoreJob := jobs.NewCsvStoreJob("csv-store-rougecombien", cfg, store)

manager := jobs.NewFaktoryManager(cfg)
manager.Register(storeJob)
manager.Register(rougecombien.NewJsonJob(cfg, manager, storeJob))

manager.Register(jsonStoreJob)
manager.Register(csvStoreJob)
manager.Register(rougecombien.NewJsonJob(cfg, manager, jsonStoreJob))
manager.Register(rougecombien.NewCsvJob(cfg, manager, csvStoreJob))

return &Faktory{
cfg: cfg,
Expand Down
2 changes: 1 addition & 1 deletion pkg/app/trieugene.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (t *trieugene) Run(ctx context.Context) error {
}

func (t *trieugeneDev) Run(ctx context.Context) error {
store := store.NewLocal(t.cfg.LocalPrefix())
store := store.NewLocal(t.cfg)

run(setupDevelopment(t.cfg))

Expand Down
88 changes: 88 additions & 0 deletions pkg/jobs/csv_store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package jobs

import (
"context"
"fmt"
"time"

"github.com/gocarina/gocsv"
"github.com/mitchellh/mapstructure"
"github.com/mlhamel/trieugene/pkg/config"
"github.com/mlhamel/trieugene/pkg/store"
)

type CsvStoreJob struct {
kind string
cfg *config.Config
store store.Store
}

func NewCsvStoreJob(kind string, cfg *config.Config, store store.Store) Job {
return &CsvStoreJob{
kind: kind,
cfg: cfg,
store: store,
}
}

func (c *CsvStoreJob) Kind() string {
return c.kind
}

func (c *CsvStoreJob) Perform(ctx context.Context, args ...interface{}) error {
c.cfg.Logger().Debug().Str("job", "CsvStoreJob").Msgf("Running with %d messages", len(args))
messages := []*Message{}

for a := range args {

c.cfg.Logger().Debug().Str("job", "CsvStoreJob").Msg("Parsing arguments")
data, ok := args[a].([]interface{})
if !ok {
c.cfg.Logger().Error().Err(ErrInvalidMsg).Msg("Error while parsing args")
return ErrInvalidMsg
}
c.cfg.Logger().Debug().Str("job", "CsvStoreJob").Int("length", len(data)).Msg("Succeed: Parsing arguments")

for b := range data {
var msg Message
err := mapstructure.Decode(data[b], &msg)
if err != nil {
c.cfg.Logger().Error().Str("job", "CsvStoreJob").Err(err).Msg("Error while decoding msg")
return err
}

messages = append(messages, &msg)
}
}

var first = messages[0]

datetime := time.Unix(first.HappenedAt, 0)
filename := fmt.Sprintf("%s/%s.csv", first.Kind, datetime.Format("20060102"))
body, err := gocsv.MarshalString(&messages)

if err != nil {
return fmt.Errorf("Error while marshaling into a csv: %w", err)
}

c.cfg.Logger().Debug().Str("filename", filename).Msg("Persisting csv")
if err := c.store.Persist(ctx, filename, body); err != nil {
c.cfg.Logger().Error().Err(ErrInvalidData).Msg("Failed at persisting csv")
return ErrInvalidData
}

return nil
}

func (c *CsvStoreJob) Run(ctx context.Context, args ...interface{}) error {
if err := c.store.Setup(ctx); err != nil {
c.cfg.Logger().Error().Err(err).Msg("An occured while setuping store")
return err
}
if err := c.Perform(ctx, args); err != nil {
c.cfg.Logger().Error().Err(err).Msg("An occured while running CsvStoreJob")
return err
}
c.cfg.Logger().Debug().Msg("Succeed running CsvStoreJob")
return nil
}
6 changes: 3 additions & 3 deletions pkg/jobs/faktory.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ func (f *FaktoryManager) Register(job Job) error {
return nil
}

func (f *FaktoryManager) Perform(job Job, msg *Message) error {
f.cfg.Logger().Debug().Msgf("Instanciating job %s with args %v", job.Kind(), msg)
func (f *FaktoryManager) Perform(job Job, msgs ...*Message) error {
f.cfg.Logger().Debug().Msgf("Instanciating job %s", job.Kind())

instance := faktory.NewJob(job.Kind(), msg)
instance := faktory.NewJob(job.Kind(), msgs)
client, err := f.faktoryClientInstance()
if err != nil {
return err
Expand Down
35 changes: 29 additions & 6 deletions pkg/jobs/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package jobs

import (
"context"

"github.com/mitchellh/mapstructure"
)

type Job interface {
Expand All @@ -11,14 +13,35 @@ type Job interface {

type Manager interface {
Register(Job) error
Perform(Job, *Message) error
Perform(Job, ...*Message) error
Run(context.Context) error
}

type Message struct {
ProcessedAt int64 `json:"processed_at" mapstructure:"processed_at"`
HappenedAt int64 `json:"happened_at" mapstructure:"happened_at"`
ID string `json:"id"`
Kind string `json:"kind"`
Value interface{} `json:"value"`
ProcessedAt int64 `json:"processed_at" mapstructure:"processed_at" csv:"processed_at"`
HappenedAt int64 `json:"happened_at" mapstructure:"happened_at" csv:"happened_at"`
ID string `json:"id" csv:"id"`
Kind string `json:"kind" csv:"kind"`
Value interface{} `json:"value" csv:"value"`
}

func NewMessageFromArg(arg interface{}) (*Message, error) {
var msg Message

data, ok := arg.([]interface{})
if !ok {
return nil, ErrInvalidMsg
}

raw, ok := data[0].(map[string]interface{})
if !ok {
return nil, ErrInvalidMsg
}

err := mapstructure.Decode(raw, &msg)
if err != nil {
return nil, err
}

return &msg, nil
}
39 changes: 12 additions & 27 deletions pkg/jobs/storejob.go → pkg/jobs/json_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,62 +6,47 @@ import (
"fmt"
"time"

"github.com/mitchellh/mapstructure"
"github.com/mlhamel/trieugene/pkg/config"
"github.com/mlhamel/trieugene/pkg/store"
)

type StoreJob struct {
type JsonStoreJob struct {
kind string
cfg *config.Config
store store.Store
}

func NewStoreJob(kind string, cfg *config.Config, store store.Store) Job {
return &StoreJob{
func NewJsonStoreJob(kind string, cfg *config.Config, store store.Store) Job {
return &JsonStoreJob{
kind: kind,
cfg: cfg,
store: store,
}
}

func (r *StoreJob) Kind() string {
func (r *JsonStoreJob) Kind() string {
return r.kind
}

type dataTemp struct {
data []interface{}
}

func (r *StoreJob) Perform(ctx context.Context, args ...interface{}) error {
r.cfg.Logger().Debug().Msgf("Running StoreJob with args %v", args)
func (r *JsonStoreJob) Perform(ctx context.Context, args ...interface{}) error {
r.cfg.Logger().Debug().Msgf("Running JsonStoreJob with args %v", args)

for a := range args {
var msg Message
data, ok := args[a].([]interface{})
if !ok {
r.cfg.Logger().Error().Err(ErrInvalidMsg).Msg("Invalid message")
return ErrInvalidMsg
}

raw, ok := data[0].(map[string]interface{})
if !ok {
r.cfg.Logger().Error().Err(ErrInvalidMsg).Msg("Invalid message")
return ErrInvalidMsg
}
msg, err := NewMessageFromArg(args[a])

r.cfg.Logger().Debug().Interface("Raw", raw).Msg("Decoding arguments")
err := mapstructure.Decode(raw, &msg)
if err != nil {
r.cfg.Logger().Error().Err(err).Msg("Error while trying to decode arguments")
return err
}

r.cfg.Logger().Debug().Str("id", msg.ID).Int64("HappenedAt", msg.HappenedAt).Msg("Persisting data")

datetime := time.Unix(msg.HappenedAt, 0)
filename := fmt.Sprintf("%s/%s/%s.json", msg.Kind, datetime.Format("20060102"), datetime.Format("1504"))
body, err := json.Marshal(data)
body, err := json.Marshal(msg)

if err != nil {
return err
Expand All @@ -75,19 +60,19 @@ func (r *StoreJob) Perform(ctx context.Context, args ...interface{}) error {
}
}

r.cfg.Logger().Debug().Interface("args", args).Msg("Done processing StoreJob")
r.cfg.Logger().Debug().Interface("args", args).Msg("Done processing JsonStoreJob")
return nil
}

func (r *StoreJob) Run(ctx context.Context, args ...interface{}) error {
func (r *JsonStoreJob) Run(ctx context.Context, args ...interface{}) error {
if err := r.store.Setup(ctx); err != nil {
r.cfg.Logger().Error().Err(err).Msg("An occured while setuping store")
return err
}
if err := r.Perform(ctx, args); err != nil {
r.cfg.Logger().Error().Err(err).Msg("An occured while running StoreJob")
r.cfg.Logger().Error().Err(err).Msg("An occured while running JsonStoreJob")
return err
}
r.cfg.Logger().Debug().Msg("Succeed running StoreJob")
r.cfg.Logger().Debug().Msg("Succeed running JsonStoreJob")
return nil
}
15 changes: 10 additions & 5 deletions pkg/store/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,36 +5,41 @@ import (
"io/ioutil"
"os"
"path"

"github.com/mlhamel/trieugene/pkg/config"
)

type Local struct {
cfg *config.Config
prefix string
}

func NewLocal(prefix string) Store {
return &Local{prefix: prefix}
func NewLocal(cfg *config.Config) Store {
return &Local{cfg: cfg}
}

func (l *Local) Setup(ctx context.Context) error {
err := os.MkdirAll(l.prefix, os.ModePerm)
err := os.MkdirAll(l.cfg.LocalPrefix(), os.ModePerm)
if err != nil {
return err
}
return nil
}

func (s *Local) Persist(ctx context.Context, filename string, data string) error {
func (l *Local) Persist(ctx context.Context, filename string, data string) error {
dataAsByte := []byte(data)

filenameWithPrefix := path.Join(s.prefix, filename)
filenameWithPrefix := path.Join(l.cfg.LocalPrefix(), filename)

dirName := path.Dir(filenameWithPrefix)

l.cfg.Logger().Debug().Str("dirname", dirName).Msg("Creating directory")
err := os.MkdirAll(dirName, os.ModePerm)
if err != nil {
return err
}

l.cfg.Logger().Debug().Str("filenameWithPrefix", dirName).Msg("Persisting file")
err = ioutil.WriteFile(filenameWithPrefix, dataAsByte, 0644)
if err != nil {
return err
Expand Down
10 changes: 9 additions & 1 deletion services/rougecombien/cmd/rougecombien/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ import (

func main() {
cfg := config.NewConfig()
rougecombien := apps.NewRougecombien(cfg)

cliApp := cli.App{
Name: "rougecombien",
Action: func(*cli.Context) error {
rougecombien := apps.NewRougecombien(cfg)
return rougecombien.Run(context.Background())
},
}
Expand All @@ -24,9 +24,17 @@ func main() {
{
Name: "dev",
Action: func(c *cli.Context) error {
rougecombien := apps.NewRougecombienDev(cfg)
return rougecombien.RunDevelopment(context.Background())
},
},
{
Name: "inline",
Action: func(c *cli.Context) error {
rougecombien := apps.NewRougecombienDev(cfg)
return rougecombien.RunInline(context.Background())
},
},
}

if err := cliApp.Run(os.Args); err != nil {
Expand Down
Loading

0 comments on commit 5bfd4cf

Please sign in to comment.