Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cmd/worker: run task from file #594

Merged
merged 7 commits into from
Jul 10, 2018
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions cmd/node/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,18 @@ import (
func Run(ctx context.Context, conf config.Config, log *logger.Logger) error {
conf.Node.ID = scheduler.GenNodeID()

w, err := workerCmd.NewWorker(ctx, conf, log)
if err != nil {
return err
factory := func(ctx context.Context, taskID string) error {
w, err := workerCmd.NewWorker(ctx, conf, log, &workerCmd.Options{
TaskID: taskID,
})
if err != nil {
return err
}
w.Run(ctx)
return nil
}

n, err := scheduler.NewNodeProcess(ctx, conf, w.Run, log)
n, err := scheduler.NewNodeProcess(ctx, conf, factory, log)
if err != nil {
return err
}
Expand Down
225 changes: 156 additions & 69 deletions cmd/worker/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,104 +14,191 @@ import (
"github.com/ohsu-comp-bio/funnel/logger"
"github.com/ohsu-comp-bio/funnel/storage"
"github.com/ohsu-comp-bio/funnel/tes"
"github.com/ohsu-comp-bio/funnel/util"
"github.com/ohsu-comp-bio/funnel/worker"
)

// Run runs the "worker run" command.
func Run(ctx context.Context, conf config.Config, log *logger.Logger, taskID string) error {
w, err := NewWorker(ctx, conf, log)
func Run(ctx context.Context, conf config.Config, log *logger.Logger, opts *Options) error {
w, err := NewWorker(ctx, conf, log, opts)
if err != nil {
return err
}
return w.Run(ctx, taskID)
return w.Run(ctx)
}

// NewWorker returns a new Funnel worker based on the given config.
func NewWorker(ctx context.Context, conf config.Config, log *logger.Logger) (*worker.DefaultWorker, error) {
func NewWorker(ctx context.Context, conf config.Config, log *logger.Logger, opts *Options) (*worker.DefaultWorker, error) {
log.Debug("NewWorker", "config", conf)

var err error
var db tes.ReadOnlyServer
var reader worker.TaskReader
var writer events.Writer
var writers events.MultiWriter

eventWriterSet := map[string]interface{}{
strings.ToLower(conf.Database): nil,
}
for _, w := range conf.EventWriters {
eventWriterSet[strings.ToLower(w)] = nil
}

for e := range eventWriterSet {
switch e {
case "log":
writer = &events.Logger{Log: log}
case "boltdb":
writer, err = events.NewRPCWriter(ctx, conf.RPCClient)
case "badger":
writer, err = events.NewRPCWriter(ctx, conf.RPCClient)
case "dynamodb":
writer, err = dynamodb.NewDynamoDB(conf.DynamoDB)
case "datastore":
writer, err = datastore.NewDatastore(conf.Datastore)
case "elastic":
writer, err = elastic.NewElastic(conf.Elastic)
case "kafka":
writer, err = events.NewKafkaWriter(ctx, conf.Kafka)
case "pubsub":
writer, err = events.NewPubSubWriter(ctx, conf.PubSub)
case "mongodb":
writer, err = mongodb.NewMongoDB(conf.MongoDB)
default:
err = fmt.Errorf("unknown event writer: %s", e)
}
if err != nil {
return nil, fmt.Errorf("error occurred while initializing the %s event writer: %v", e, err)
}
if writer != nil {
writers = append(writers, writer)
}
err := validateConfig(conf, opts)
if err != nil {
return nil, fmt.Errorf("validating config: %v", err)
}

writer = &events.SystemLogFilter{Writer: &writers, Level: conf.Logger.Level}
writer = &events.ErrLogger{Writer: writer, Log: log}

switch strings.ToLower(conf.Database) {
case "datastore":
db, err = datastore.NewDatastore(conf.Datastore)
case "dynamodb":
db, err = dynamodb.NewDynamoDB(conf.DynamoDB)
case "elastic":
db, err = elastic.NewElastic(conf.Elastic)
case "mongodb":
db, err = mongodb.NewMongoDB(conf.MongoDB)
case "boltdb":
reader, err = worker.NewRPCTaskReader(ctx, conf.RPCClient)
case "badger":
reader, err = worker.NewRPCTaskReader(ctx, conf.RPCClient)
default:
err = fmt.Errorf("unknown database: '%s'", conf.Database)
// Construct a set of event writers based on the config.
builder := eventWriterBuilder{}
// If the task comes from a file or string,
// don't assume we should write to the database.
if opts.TaskFile == "" && opts.TaskBase64 == "" {
builder.Add(ctx, conf.Database, conf, log)
}
// Add configured event writers.
for _, e := range conf.EventWriters {
builder.Add(ctx, e, conf, log)
}
// Get the built writer.
writer, err := builder.Writer()
if err != nil {
return nil, fmt.Errorf("failed to instantiate database client: %v", err)
return nil, fmt.Errorf("creating event writers: %v", err)
}
if reader == nil {
reader = worker.NewGenericTaskReader(db.GetTask)

// Wrap the event writers in a couple filters.
writer = &events.SystemLogFilter{Writer: writer, Level: conf.Logger.Level}
writer = &events.ErrLogger{Writer: writer, Log: log}

// Get the task source reader: database, file, etc.
reader, err := newTaskReader(ctx, conf, opts)
if err != nil {
return nil, fmt.Errorf("creating task reader: %v", err)
}

// Initialize task storage client.
store, err := storage.NewMux(conf)
if err != nil {
return nil, fmt.Errorf("failed to instantiate Storage backend: %v", err)
}
store.AttachLogger(log)

w := &worker.DefaultWorker{
return &worker.DefaultWorker{
Conf: conf.Worker,
Store: store,
TaskReader: reader,
EventWriter: writer,
}, nil
}

// newTaskReader finds a TaskReader implementation that matches the config
// and commandline options.
func newTaskReader(ctx context.Context, conf config.Config, opts *Options) (worker.TaskReader, error) {

switch {
// These readers are used to read a local task from a file, cli arg, etc.
case opts.TaskFile != "":
return worker.NewFileTaskReader(opts.TaskFile)

case opts.TaskBase64 != "":
return worker.NewBase64TaskReader(opts.TaskBase64)
}

return w, nil
switch strings.ToLower(conf.Database) {
// These readers will connect to the configured task database.
case "datastore":
db, err := datastore.NewDatastore(conf.Datastore)
return newDatabaseTaskReader(opts.TaskID, db, err)

case "dynamodb":
db, err := dynamodb.NewDynamoDB(conf.DynamoDB)
return newDatabaseTaskReader(opts.TaskID, db, err)

case "elastic":
db, err := elastic.NewElastic(conf.Elastic)
return newDatabaseTaskReader(opts.TaskID, db, err)

case "mongodb":
db, err := mongodb.NewMongoDB(conf.MongoDB)
return newDatabaseTaskReader(opts.TaskID, db, err)

// These readers connect via RPC (because the database is embedded in the server).
case "boltdb", "badger":
return worker.NewRPCTaskReader(ctx, conf.RPCClient, opts.TaskID)

// No matching reader. Fail.
default:
return nil, fmt.Errorf("no matching task reader found")
}
}

// newDatabaseTaskReader helps create a generic task reader wrapper
// for the given database backend.
func newDatabaseTaskReader(taskID string, db tes.ReadOnlyServer, err error) (worker.TaskReader, error) {
if err != nil {
return nil, fmt.Errorf("creating database task reader: %v", err)
}
return worker.NewGenericTaskReader(db.GetTask, taskID), nil
}

// eventWriterBuilder is a helper for building a set of event writers,
// collecting errors, de-duplicating config, etc.
type eventWriterBuilder struct {
errors util.MultiError
writers events.MultiWriter
// seen tracks which event writers have already been built,
// so we don't build the same one twice.
seen map[string]bool
}

// Writers gets all the event writers and errors collected by multiple calls to Add().
func (e *eventWriterBuilder) Writer() (events.Writer, error) {
return &e.writers, e.errors.ToError()
}

// Add creates a new event writer by name and adds it to the builder.
func (e *eventWriterBuilder) Add(ctx context.Context, name string, conf config.Config, log *logger.Logger) {
if name == "" {
return
}

if e.seen == nil {
e.seen = map[string]bool{}
}

// If we've already created this event writer "name", skip it.
if _, ok := e.seen[name]; ok {
return
}
e.seen[name] = true

var err error
var writer events.Writer

switch name {
case "log":
writer = &events.Logger{Log: log}
case "boltdb", "badger":
writer, err = events.NewRPCWriter(ctx, conf.RPCClient)
case "dynamodb":
writer, err = dynamodb.NewDynamoDB(conf.DynamoDB)
case "datastore":
writer, err = datastore.NewDatastore(conf.Datastore)
case "elastic":
writer, err = elastic.NewElastic(conf.Elastic)
case "kafka":
writer, err = events.NewKafkaWriter(ctx, conf.Kafka)
case "pubsub":
writer, err = events.NewPubSubWriter(ctx, conf.PubSub)
case "mongodb":
writer, err = mongodb.NewMongoDB(conf.MongoDB)
default:
err = fmt.Errorf("unknown event writer: %s", name)
}

if err != nil {
e.errors = append(e.errors, err)
} else {
e.writers = append(e.writers, writer)
}
}

func validateConfig(conf config.Config, opts *Options) error {
// If the task reader is a file or string,
// only a subset of event writers are supported.
if opts.TaskFile != "" || opts.TaskBase64 != "" {
for _, e := range conf.EventWriters {
if e != "log" && e != "kafka" && e != "pubsub" {
return fmt.Errorf("event writer %q is not supported with a task file/string reader", e)
}
}
}
return nil
}
21 changes: 15 additions & 6 deletions cmd/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,21 @@ import (
"github.com/spf13/cobra"
)

// Options holds a few CLI options for worker entrypoints.
type Options struct {
TaskID string
TaskFile string
TaskBase64 string
}

// NewCommand returns the worker command
func NewCommand() *cobra.Command {
cmd, _ := newCommandHooks()
return cmd
}

type hooks struct {
Run func(ctx context.Context, conf config.Config, log *logger.Logger, taskID string) error
Run func(ctx context.Context, conf config.Config, log *logger.Logger, opts *Options) error
}

func newCommandHooks() (*cobra.Command, *hooks) {
Expand All @@ -32,8 +39,8 @@ func newCommandHooks() (*cobra.Command, *hooks) {
configFile string
conf config.Config
flagConf config.Config
taskID string
)
opts := &Options{}

cmd := &cobra.Command{
Use: "worker",
Expand All @@ -59,21 +66,23 @@ func newCommandHooks() (*cobra.Command, *hooks) {
Short: "Run a task directly, bypassing the server.",
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, args []string) error {
if taskID == "" {
return fmt.Errorf("no taskID was provided")
if opts.TaskID == "" && opts.TaskFile == "" && opts.TaskBase64 == "" {
return fmt.Errorf("no task was provided")
}

log := logger.NewLogger("worker", conf.Logger)
logger.SetGRPCLogger(log)
ctx, cancel := context.WithCancel(context.Background())
ctx = util.SignalContext(ctx, time.Millisecond*500, syscall.SIGINT, syscall.SIGTERM)
defer cancel()
return hooks.Run(ctx, conf, log, taskID)
return hooks.Run(ctx, conf, log, opts)
},
}

f = run.Flags()
f.StringVarP(&taskID, "taskID", "t", taskID, "Task ID")
f.StringVarP(&opts.TaskID, "taskID", "t", opts.TaskID, "Task ID")
f.StringVarP(&opts.TaskFile, "taskFile", "f", opts.TaskFile, "Task file")
f.StringVarP(&opts.TaskBase64, "taskBase64", "b", opts.TaskBase64, "Task base64")
cmd.AddCommand(run)

return cmd, hooks
Expand Down
2 changes: 1 addition & 1 deletion cmd/worker/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func TestPersistentPreRun(t *testing.T) {
defer cleanup()

c, h := newCommandHooks()
h.Run = func(ctx context.Context, conf config.Config, log *logger.Logger, taskID string) error {
h.Run = func(ctx context.Context, conf config.Config, log *logger.Logger, opts *Options) error {
if conf.Server.HostName != host {
t.Fatal("unexpected Server.HostName in config", conf.Server.HostName)
}
Expand Down
Loading