Skip to content

Commit

Permalink
Refactor jobs to refer more specifically to json.
Browse files Browse the repository at this point in the history
  • Loading branch information
Mathieu Leduc-Hamel committed Mar 16, 2021
1 parent 82e31c0 commit d19c369
Show file tree
Hide file tree
Showing 10 changed files with 50 additions and 100 deletions.
2 changes: 1 addition & 1 deletion pkg/app/faktory.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func NewFaktory(cfg *config.Config, store store.Store) runnable.Runnable {
storeJob := jobs.NewStoreJob("store-rougecombien", cfg, store)
manager := jobs.NewFaktoryManager(cfg)
manager.Register(storeJob)
manager.Register(rougecombien.NewOverflowjob(cfg, manager, storeJob))
manager.Register(rougecombien.NewJsonJob(cfg, manager, storeJob))

return &Faktory{
cfg: cfg,
Expand Down
17 changes: 16 additions & 1 deletion pkg/app/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,13 @@ type Store struct {
value string
}

type data struct {
Timestamp int64 `json:"timestamp"`
ID string `json:"id"`
Name string `json:"name"`
Value interface{} `json:"value"`
}

func NewStore(cfg *config.Config, store store.Store, kind string, key string, value string) runnable.Runnable {
return &Store{
cfg: cfg,
Expand All @@ -39,12 +46,20 @@ func (s *Store) Run(ctx context.Context) error {
return fmt.Errorf("Error while unmarshaling value (%s): %w", s.value, err)
}

err = s.store.Persist(context.Background(), &store.Data{
timestamp := time.Now()
filename := fmt.Sprintf("%s/%s/%s.json", s.kind, timestamp.Format("20060102"), timestamp.Format("1504"))
dataStr, err := json.Marshal(data{
Timestamp: time.Now().Unix(),
Name: s.kind,
ID: s.key,
Value: result,
})

if err != nil {
return fmt.Errorf("Error while marshaling data: %w", err)
}

err = s.store.Persist(context.Background(), filename, string(dataStr))
if err != nil {
return err
}
Expand Down
11 changes: 0 additions & 11 deletions pkg/jobs/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package jobs

import (
"context"

"github.com/mlhamel/trieugene/pkg/store"
)

type Job interface {
Expand All @@ -24,12 +22,3 @@ type Message struct {
Kind string `json:"kind"`
Value interface{} `json:"value"`
}

func (m *Message) Data() *store.Data {
return &store.Data{
Timestamp: m.HappenedAt,
ID: m.ID,
Name: m.Kind,
Value: m.Value,
}
}
16 changes: 15 additions & 1 deletion pkg/jobs/storejob.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package jobs

import (
"context"
"encoding/json"
"fmt"
"time"

"github.com/mitchellh/mapstructure"
"github.com/mlhamel/trieugene/pkg/config"
Expand Down Expand Up @@ -55,7 +58,18 @@ func (r *StoreJob) Perform(ctx context.Context, args ...interface{}) error {
}

r.cfg.Logger().Debug().Str("id", msg.ID).Int64("HappenedAt", msg.HappenedAt).Msg("Persisting data")
if err := r.store.Persist(ctx, msg.Data()); err != nil {

datetime := time.Unix(msg.HappenedAt, 0)
filename := fmt.Sprintf("%s/%s/%s.json", msg.Kind, datetime.Format("20060102"), datetime.Format("1504"))
body, err := json.Marshal(data)

if err != nil {
return err
}

bodyStr := string(body)

if err := r.store.Persist(ctx, filename, bodyStr); err != nil {
r.cfg.Logger().Error().Err(err).Msg("Error while trying to persist data")
return err
}
Expand Down
47 changes: 0 additions & 47 deletions pkg/store/datadog.go

This file was deleted.

8 changes: 3 additions & 5 deletions pkg/store/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package store
import (
"bytes"
"context"
"fmt"
"io"

"cloud.google.com/go/storage"
Expand Down Expand Up @@ -40,12 +39,11 @@ func (g *GoogleCloudStorage) Setup(ctx context.Context) error {
return g.bucket.Create(ctx, g.cfg.ProjectID(), nil)
}

func (g *GoogleCloudStorage) Persist(ctx context.Context, data *Data) error {
func (g *GoogleCloudStorage) Persist(ctx context.Context, filename string, data string) error {
g.cfg.Logger().Debug().Msgf("Store/GoogleCloudStorage/Persist: Start")
fileName := buildKey(data.Name, data.Timestamp, data.ID)
object := g.bucket.Object(fileName)
object := g.bucket.Object(filename)
w := object.NewWriter(ctx)
reader := bytes.NewReader([]byte(fmt.Sprintf("%v", data)))
reader := bytes.NewReader([]byte(data))

if _, err := io.Copy(w, reader); err != nil {
return err
Expand Down
9 changes: 1 addition & 8 deletions pkg/store/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,7 @@ import (
"context"
)

type Data struct {
Timestamp int64 `json:"timestamp"`
ID string `json:"id"`
Name string `json:"name"`
Value interface{} `json:"value"`
}

type Store interface {
Setup(ctx context.Context) error
Persist(context.Context, *Data) error
Persist(ctx context.Context, filename string, data string) error
}
20 changes: 4 additions & 16 deletions pkg/store/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package store

import (
"context"
"encoding/json"
"fmt"
"strings"
"time"

Expand Down Expand Up @@ -64,21 +62,11 @@ func (s *S3) Setup(ctx context.Context) error {
return nil
}

func (s *S3) Persist(ctx context.Context, data *Data) error {
datetime := time.Unix(data.Timestamp, 0)
key := fmt.Sprintf("%s/%s/%s.json", data.Name, datetime.Format("20060102"), datetime.Format("1504"))
body, err := json.Marshal(data)

if err != nil {
return err
}

bodyStr := string(body)

_, err = s.client.PutObjectWithContext(ctx, &s3.PutObjectInput{
func (s *S3) Persist(ctx context.Context, filename string, data string) error {
_, err := s.client.PutObjectWithContext(ctx, &s3.PutObjectInput{
Bucket: aws.String(s.params.Bucket),
Key: aws.String(key),
Body: strings.NewReader(bodyStr),
Key: aws.String(filename),
Body: strings.NewReader(data),
})
if err != nil {
return err
Expand Down
6 changes: 3 additions & 3 deletions services/rougecombien/pkg/apps/rougecombien.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,17 @@ func NewRougecombien(cfg *config.Config) *Rougecombien {
}

func (r *Rougecombien) Run(ctx context.Context) error {
return r.manager.Perform(jobs.NewOverflowjob(r.cfg, r.manager, r.storeJob), &trieugene.Message{})
return r.manager.Perform(jobs.NewJsonJob(r.cfg, r.manager, r.storeJob), &trieugene.Message{})
}

func (r *Rougecombien) RunDevelopment(ctx context.Context) error {
run(r.setupDevelopment())

return r.manager.Perform(jobs.NewOverflowjob(r.cfg, r.manager, r.storeJob), &trieugene.Message{})
return r.manager.Perform(jobs.NewJsonJob(r.cfg, r.manager, r.storeJob), &trieugene.Message{})
}

func (r *Rougecombien) genericRun(ctx context.Context, result scraper.Result) error {
return r.manager.Perform(jobs.NewOverflowjob(r.cfg, r.manager, r.storeJob), &trieugene.Message{})
return r.manager.Perform(jobs.NewJsonJob(r.cfg, r.manager, r.storeJob), &trieugene.Message{})
}

func (r *Rougecombien) setupDevelopment() runnable.Runnable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,25 @@ import (
"github.com/mlhamel/trieugene/services/rougecombien/pkg/scraper"
)

type OverflowJob struct {
type JsonJob struct {
cfg *config.Config
manager trieugene.Manager
storejob trieugene.Job
}

func NewOverflowjob(cfg *config.Config, manager trieugene.Manager, storejob trieugene.Job) trieugene.Job {
return &OverflowJob{
func NewJsonJob(cfg *config.Config, manager trieugene.Manager, storejob trieugene.Job) trieugene.Job {
return &JsonJob{
cfg: cfg,
manager: manager,
storejob: storejob,
}
}

func (o *OverflowJob) Kind() string {
return "overflow-rougecombien"
func (o *JsonJob) Kind() string {
return "json-rougecombien"
}

func (o *OverflowJob) Perform(ctx context.Context, args ...interface{}) error {
func (o *JsonJob) Perform(ctx context.Context, args ...interface{}) error {
return scraper.NewScraper(o.cfg, func(ctx context.Context, result scraper.Result) error {
return o.manager.Perform(o.storejob, &trieugene.Message{
ID: result.Sha1(),
Expand All @@ -38,7 +38,7 @@ func (o *OverflowJob) Perform(ctx context.Context, args ...interface{}) error {
}).Run(ctx)
}

func (o *OverflowJob) Run(ctx context.Context, args ...interface{}) error {
func (o *JsonJob) Run(ctx context.Context, args ...interface{}) error {
if err := o.Perform(ctx, args); err != nil {
return err
}
Expand Down

0 comments on commit d19c369

Please sign in to comment.