From a3fa7acebd33fd1e479205311594f32d6e57ddb1 Mon Sep 17 00:00:00 2001 From: Liam Beckman Date: Fri, 28 Jul 2023 19:21:21 -0700 Subject: [PATCH] Revert added error handling - This linting task is separate from TESv1.1 compliance and could use it's own PR/issue for discussion --- cmd/aws/batch/createall.go | 5 +- cmd/aws/batch/createjobdef.go | 5 +- cmd/completion.go | 6 +- cmd/node/run.go | 14 +--- cmd/run/cmd.go | 2 +- cmd/run/flags.go | 10 +-- cmd/run/task_group.go | 5 +- cmd/task/create.go | 2 +- cmd/task/task_test.go | 90 +++++----------------- cmd/worker/worker_test.go | 20 +---- compute/batch/backend.go | 31 +++----- compute/hpc_backend.go | 19 ++--- compute/kubernetes/backend.go | 10 +-- compute/local/backend.go | 5 +- compute/scheduler/node.go | 9 +-- compute/scheduler/predicates_test.go | 14 +--- compute/scheduler/scheduler.go | 21 +----- compute/scheduler/testutils_test.go | 9 +-- compute/scheduler/util.go | 11 ++- config/config_test.go | 5 +- config/gce/meta_test.go | 5 +- database/boltdb/events.go | 32 ++------ database/boltdb/new.go | 53 +++---------- database/boltdb/scheduler.go | 15 +--- database/boltdb/tes.go | 25 ++---- database/elastic/scheduler.go | 5 +- database/mongodb/events.go | 2 +- database/mongodb/new.go | 2 - events/executor.go | 25 ++---- events/kafka.go | 5 +- events/pubsub.go | 12 +-- logger/text_formatter_test.go | 5 +- server/server.go | 5 +- storage/ftp.go | 11 +-- storage/http.go | 4 +- storage/local_test.go | 30 ++------ storage/storage_test.go | 16 ++-- tes/client.go | 9 +-- tes/urlvalues.go | 10 +-- tes/utils.go | 5 +- tests/config_utils.go | 10 +-- tests/core/basic_test.go | 21 +----- tests/core/worker_test.go | 10 +-- tests/funnel_utils.go | 57 +++++++------- tests/gridengine/gridengine_test.go | 1 + tests/pbs/pbs_test.go | 1 + tests/perf/perf_test.go | 5 +- tests/scheduler/node_health_test.go | 15 +--- tests/scheduler/node_test.go | 109 ++++++--------------------- tests/scheduler/scheduler_test.go | 5 +- tests/slurm/slurm_test.go | 1 + tests/storage/amazon_s3_test.go | 5 +- tests/storage/gs_test.go | 5 +- tests/storage/multi_s3_test.go | 10 +-- tests/storage/storage_test.go | 19 ++--- tests/storage/swift_test.go | 5 +- util/dockerutil/docker.go | 3 +- util/fsutil/io.go | 10 +-- util/idle_timeout.go | 36 ++++----- util/retry_test.go | 10 +-- util/ring/ring_test.go | 10 +-- worker/docker.go | 33 +++----- worker/file_mapper.go | 9 ++- worker/step.go | 14 ++-- worker/storage.go | 50 +++--------- worker/taskreader_test.go | 4 +- worker/worker.go | 89 ++++++---------------- 67 files changed, 293 insertions(+), 823 deletions(-) diff --git a/cmd/aws/batch/createall.go b/cmd/aws/batch/createall.go index 48713ee20..dd5d393ca 100644 --- a/cmd/aws/batch/createall.go +++ b/cmd/aws/batch/createall.go @@ -44,10 +44,7 @@ var createCmd = &cobra.Command{ if funnelConfigFile != "" { funnelConf := config.Config{} - err := config.ParseFile(funnelConfigFile, &funnelConf) - if err != nil { - return err - } + config.ParseFile(funnelConfigFile, &funnelConf) conf.Funnel = funnelConf } diff --git a/cmd/aws/batch/createjobdef.go b/cmd/aws/batch/createjobdef.go index 61f66ab18..c61186470 100644 --- a/cmd/aws/batch/createjobdef.go +++ b/cmd/aws/batch/createjobdef.go @@ -38,10 +38,7 @@ var jobDefCmd = &cobra.Command{ if funnelConfigFile != "" { funnelConf := config.Config{} - err := config.ParseFile(funnelConfigFile, &funnelConf) - if err != nil { - return err - } + config.ParseFile(funnelConfigFile, &funnelConf) conf.Funnel = funnelConf } diff --git a/cmd/completion.go b/cmd/completion.go index 31127fb4b..3e3ab0c51 100644 --- a/cmd/completion.go +++ b/cmd/completion.go @@ -1,7 +1,6 @@ package cmd import ( - "log" "os" "github.com/spf13/cobra" @@ -18,10 +17,7 @@ var bash = &cobra.Command{ Long: `This command generates bash CLI completion code. Add "source <(funnel completion bash)" to your bash profile.`, Run: func(cmd *cobra.Command, args []string) { - err := RootCmd.GenBashCompletion(os.Stdout) - if err != nil { - log.Fatalf("Error generating bash completion: %v", err) - } + RootCmd.GenBashCompletion(os.Stdout) }, } diff --git a/cmd/node/run.go b/cmd/node/run.go index 44076da43..c0492273a 100644 --- a/cmd/node/run.go +++ b/cmd/node/run.go @@ -25,10 +25,7 @@ func Run(ctx context.Context, conf config.Config, log *logger.Logger) error { if err != nil { return err } - err = w.Run(ctx) - if err != nil { - return err - } + w.Run(ctx) return nil } @@ -37,8 +34,8 @@ func Run(ctx context.Context, conf config.Config, log *logger.Logger) error { return err } - _, cancel := context.WithCancel(context.Background()) - runctx := util.SignalContext(ctx, time.Nanosecond, syscall.SIGINT, syscall.SIGTERM) + runctx, cancel := context.WithCancel(context.Background()) + runctx = util.SignalContext(ctx, time.Nanosecond, syscall.SIGINT, syscall.SIGTERM) defer cancel() hupsig := make(chan os.Signal, 1) @@ -54,10 +51,7 @@ func Run(ctx context.Context, conf config.Config, log *logger.Logger) error { }() signal.Notify(hupsig, syscall.SIGHUP) - err = n.Run(runctx) - if err != nil { - return err - } + n.Run(runctx) return nil } diff --git a/cmd/run/cmd.go b/cmd/run/cmd.go index 29781a388..6d8bd5206 100644 --- a/cmd/run/cmd.go +++ b/cmd/run/cmd.go @@ -22,7 +22,7 @@ var Cmd = &cobra.Command{ RunE: func(cmd *cobra.Command, args []string) error { err := Run(args) if err != nil { - err := cmd.Usage() + cmd.Usage() return err } return err diff --git a/cmd/run/flags.go b/cmd/run/flags.go index eedc7d2d6..2a2befc75 100644 --- a/cmd/run/flags.go +++ b/cmd/run/flags.go @@ -172,10 +172,7 @@ func parseTopLevelArgs(vals *flagVals, args []string) error { func parseTaskArgs(vals *flagVals, args []string) { fl := newFlags(vals) - err := fl.Parse(args) - if err != nil { - return - } + fl.Parse(args) buildExecs(fl, vals, args) } @@ -188,7 +185,7 @@ func parseTaskArgs(vals *flagVals, args []string) { func buildExecs(flags *pflag.FlagSet, vals *flagVals, args []string) { vals.execs = nil var exec *executor - err := flags.ParseAll(args, func(f *pflag.Flag, value string) error { + flags.ParseAll(args, func(f *pflag.Flag, value string) error { switch f.Name { case "sh", "exec": if exec != nil { @@ -210,9 +207,6 @@ func buildExecs(flags *pflag.FlagSet, vals *flagVals, args []string) { } return nil }) - if err != nil { - return - } if exec != nil { vals.execs = append(vals.execs, *exec) } diff --git a/cmd/run/task_group.go b/cmd/run/task_group.go index c0d0b46d8..a46615606 100644 --- a/cmd/run/task_group.go +++ b/cmd/run/task_group.go @@ -56,10 +56,7 @@ func (tg *taskGroup) _run(task *tes.Task, wait bool, waitFor []string) error { if len(waitFor) > 0 { for _, tid := range waitFor { - err := tg.client.WaitForTask(context.Background(), tid) - if err != nil { - return err - } + tg.client.WaitForTask(context.Background(), tid) } } diff --git a/cmd/task/create.go b/cmd/task/create.go index ec125a8e8..c79e1a5b8 100644 --- a/cmd/task/create.go +++ b/cmd/task/create.go @@ -22,10 +22,10 @@ func Create(server string, files []string, reader io.Reader, writer io.Writer) e for _, taskFile := range files { f, err := os.Open(taskFile) + defer f.Close() if err != nil { return err } - defer f.Close() reader = io.MultiReader(reader, f) } diff --git a/cmd/task/task_test.go b/cmd/task/task_test.go index 390b1500b..2a31a1abd 100644 --- a/cmd/task/task_test.go +++ b/cmd/task/task_test.go @@ -20,10 +20,7 @@ func TestGet(t *testing.T) { } cmd.SetArgs([]string{"get", "--view", "MINIMAL", "1", "2"}) - err := cmd.Execute() - if err != nil { - t.Error(err) - } + cmd.Execute() } // "get" command should have default view of FULL @@ -38,10 +35,7 @@ func TestGetDefaultView(t *testing.T) { } cmd.SetArgs([]string{"get", "1", "2"}) - err := cmd.Execute() - if err != nil { - t.Error(err) - } + cmd.Execute() } func TestList(t *testing.T) { @@ -55,10 +49,7 @@ func TestList(t *testing.T) { } cmd.SetArgs([]string{"list", "--view", "FULL"}) - err := cmd.Execute() - if err != nil { - t.Error(err) - } + cmd.Execute() } // Test that the server URL defaults to localhost:8000 @@ -97,34 +88,19 @@ func TestServerDefault(t *testing.T) { } cmd.SetArgs([]string{"create", "foo.json"}) - err := cmd.Execute() - if err != nil { - t.Errorf("%s", err) - } + cmd.Execute() cmd.SetArgs([]string{"list"}) - err = cmd.Execute() - if err != nil { - t.Errorf("%s", err) - } + cmd.Execute() cmd.SetArgs([]string{"get", "1"}) - err = cmd.Execute() - if err != nil { - t.Errorf("%s", err) - } + cmd.Execute() cmd.SetArgs([]string{"cancel", "1"}) - err = cmd.Execute() - if err != nil { - t.Errorf("%s", err) - } + cmd.Execute() cmd.SetArgs([]string{"wait", "1"}) - err = cmd.Execute() - if err != nil { - t.Errorf("%s", err) - } + cmd.Execute() } // Test that the server URL may be set via a FUNNEL_SERVER environment @@ -166,34 +142,19 @@ func TestServerEnv(t *testing.T) { } cmd.SetArgs([]string{"create", "foo.json"}) - err := cmd.Execute() - if err != nil { - t.Errorf("%s", err) - } + cmd.Execute() cmd.SetArgs([]string{"list"}) - err = cmd.Execute() - if err != nil { - t.Errorf("%s", err) - } + cmd.Execute() cmd.SetArgs([]string{"get", "1"}) - err = cmd.Execute() - if err != nil { - t.Errorf("%s", err) - } + cmd.Execute() cmd.SetArgs([]string{"cancel", "1"}) - err = cmd.Execute() - if err != nil { - t.Errorf("%s", err) - } + cmd.Execute() cmd.SetArgs([]string{"wait", "1"}) - err = cmd.Execute() - if err != nil { - t.Errorf("%s", err) - } + cmd.Execute() } // Test that the server flag overrides the FUNNEL_SERVER env var @@ -235,32 +196,17 @@ func TestServerFlagOverride(t *testing.T) { } cmd.SetArgs([]string{"create", "-S", srv, "foo.json"}) - err := cmd.Execute() - if err != nil { - t.Errorf("%s", err) - } + cmd.Execute() cmd.SetArgs([]string{"list", "-S", srv}) - err = cmd.Execute() - if err != nil { - t.Errorf("%s", err) - } + cmd.Execute() cmd.SetArgs([]string{"get", "-S", srv, "1"}) - err = cmd.Execute() - if err != nil { - t.Errorf("%s", err) - } + cmd.Execute() cmd.SetArgs([]string{"cancel", "-S", srv, "1"}) - err = cmd.Execute() - if err != nil { - t.Errorf("%s", err) - } + cmd.Execute() cmd.SetArgs([]string{"wait", "-S", srv, "1"}) - err = cmd.Execute() - if err != nil { - t.Errorf("%s", err) - } + cmd.Execute() } diff --git a/cmd/worker/worker_test.go b/cmd/worker/worker_test.go index 0b914bb68..e280601d3 100644 --- a/cmd/worker/worker_test.go +++ b/cmd/worker/worker_test.go @@ -65,10 +65,7 @@ func TestTaskFileOption(t *testing.T) { } c.SetArgs([]string{"run", "--taskFile", "test.task.json"}) - err := c.Execute() - if err != nil { - t.Fatal(err) - } + c.Execute() h.Run = func(ctx context.Context, conf config.Config, log *logger.Logger, opts *Options) error { if opts.TaskFile != "test.task.json" { @@ -78,10 +75,7 @@ func TestTaskFileOption(t *testing.T) { } c.SetArgs([]string{"run", "-f", "test.task.json"}) - err = c.Execute() - if err != nil { - t.Fatal(err) - } + c.Execute() h.Run = func(ctx context.Context, conf config.Config, log *logger.Logger, opts *Options) error { if opts.TaskBase64 != "abcd" { @@ -91,10 +85,7 @@ func TestTaskFileOption(t *testing.T) { } c.SetArgs([]string{"run", "--taskBase64", "abcd"}) - err = c.Execute() - if err != nil { - t.Fatal(err) - } + c.Execute() h.Run = func(ctx context.Context, conf config.Config, log *logger.Logger, opts *Options) error { if opts.TaskBase64 != "abcd" { @@ -104,8 +95,5 @@ func TestTaskFileOption(t *testing.T) { } c.SetArgs([]string{"run", "-b", "abcd"}) - err = c.Execute() - if err != nil { - t.Fatal(err) - } + c.Execute() } diff --git a/compute/batch/backend.go b/compute/batch/backend.go index 4bd6972c2..ee3dd7bac 100644 --- a/compute/batch/backend.go +++ b/compute/batch/backend.go @@ -5,6 +5,7 @@ import ( "context" "fmt" "regexp" + "strconv" "time" "github.com/aws/aws-sdk-go/aws" @@ -90,10 +91,12 @@ func (b *Backend) Submit(task *tes.Task) error { ram := int64(task.Resources.RamGb * 953.674) if ram > 0 { req.ContainerOverrides.Memory = aws.Int64(ram) - // req.ContainerOverrides.ResourceRequirements = []*batch.ResourceRequirement{ - // 0: aws.String("GPU"), - // 0: aws.Int64(ram), - // } + req.ContainerOverrides.ResourceRequirements = []*batch.ResourceRequirement{ + &batch.ResourceRequirement { + Type: aws.String("MEMORY"), + Value: aws.String(strconv.FormatInt(ram, 10)), + } + } } vcpus := int64(task.Resources.CpuCores) @@ -104,11 +107,8 @@ func (b *Backend) Submit(task *tes.Task) error { resp, err := b.client.SubmitJob(req) if err != nil { - err := b.event.WriteEvent(ctx, events.NewState(task.Id, tes.SystemError)) - if err != nil { - b.log.Error("Error writing state event", err) - } - err = b.event.WriteEvent( + b.event.WriteEvent(ctx, events.NewState(task.Id, tes.SystemError)) + b.event.WriteEvent( ctx, events.NewSystemLog( task.Id, 0, 0, "error", @@ -116,9 +116,6 @@ func (b *Backend) Submit(task *tes.Task) error { map[string]string{"error": err.Error()}, ), ) - if err != nil { - b.log.Error("Error writing state event", err) - } return err } @@ -224,11 +221,8 @@ ReconcileLoop: jstate := *j.Status if jstate == "FAILED" { - err := b.event.WriteEvent(ctx, events.NewState(task.Id, tes.SystemError)) - if err != nil { - b.log.Error("Error writing state event", err) - } - err = b.event.WriteEvent( + b.event.WriteEvent(ctx, events.NewState(task.Id, tes.SystemError)) + b.event.WriteEvent( ctx, events.NewSystemLog( task.Id, 0, 0, "error", @@ -236,9 +230,6 @@ ReconcileLoop: map[string]string{"error": *j.StatusReason, "awsbatch_id": *j.JobId}, ), ) - if err != nil { - b.log.Error("Error writing state event", err) - } } } diff --git a/compute/hpc_backend.go b/compute/hpc_backend.go index 06c715629..a462668f8 100644 --- a/compute/hpc_backend.go +++ b/compute/hpc_backend.go @@ -78,8 +78,8 @@ func (b *HPCBackend) Submit(task *tes.Task) error { err = cmd.Run() fmt.Println("DEBUG err:", err) if err != nil { - _ = b.Event.WriteEvent(ctx, events.NewState(task.Id, tes.SystemError)) - _ = b.Event.WriteEvent( + b.Event.WriteEvent(ctx, events.NewState(task.Id, tes.SystemError)) + b.Event.WriteEvent( ctx, events.NewSystemLog( task.Id, 0, 0, "error", @@ -197,11 +197,8 @@ ReconcileLoop: } if t.TESState == tes.SystemError { - err := b.Event.WriteEvent(ctx, events.NewState(task.Id, tes.SystemError)) - if err != nil { - b.Log.Error("Error writing state event", err) - } - err = b.Event.WriteEvent( + b.Event.WriteEvent(ctx, events.NewState(task.Id, tes.SystemError)) + b.Event.WriteEvent( ctx, events.NewSystemLog( task.Id, 0, 0, "error", @@ -213,14 +210,8 @@ ReconcileLoop: }, ), ) - if err != nil { - b.Log.Error("Error writing state event", err) - } if t.Remove { - err := exec.Command(b.CancelCmd, t.ID).Run() - if err != nil { - b.Log.Error("Error calling CancelCmd", err) - } + exec.Command(b.CancelCmd, t.ID).Run() } } } diff --git a/compute/kubernetes/backend.go b/compute/kubernetes/backend.go index 6f0754ebd..9a29c263e 100644 --- a/compute/kubernetes/backend.go +++ b/compute/kubernetes/backend.go @@ -242,11 +242,8 @@ ReconcileLoop: if err != nil { b.log.Error("reconcile: marshal failed job conditions", "taskID", j.Name, "error", err) } - err = b.event.WriteEvent(ctx, events.NewState(j.Name, tes.SystemError)) - if err != nil { - b.log.Error("reconcile: writing failed job state", "taskID", j.Name, "error", err) - } - err = b.event.WriteEvent( + b.event.WriteEvent(ctx, events.NewState(j.Name, tes.SystemError)) + b.event.WriteEvent( ctx, events.NewSystemLog( j.Name, 0, 0, "error", @@ -254,9 +251,6 @@ ReconcileLoop: map[string]string{"error": string(conds)}, ), ) - if err != nil { - b.log.Error("reconcile: writing failed job state", "taskID", j.Name, "error", err) - } if disableCleanup { continue ReconcileLoop } diff --git a/compute/local/backend.go b/compute/local/backend.go index d55aa06b8..6bc799110 100644 --- a/compute/local/backend.go +++ b/compute/local/backend.go @@ -67,10 +67,7 @@ func (b *Backend) Submit(task *tes.Task) error { } go func() { - err = w.Run(ctx) - if err != nil { - b.log.Error("error calling Run", err) - } + w.Run(ctx) w.Close() }() diff --git a/compute/scheduler/node.go b/compute/scheduler/node.go index 76f082ca5..6f4df3bcf 100644 --- a/compute/scheduler/node.go +++ b/compute/scheduler/node.go @@ -41,7 +41,7 @@ func NewNodeProcess(ctx context.Context, conf config.Config, factory Worker, log conf: conf, client: cli, log: log, - resources: res, + resources: &res, workerRun: factory, workers: newRunSet(), timeout: timeout, @@ -98,10 +98,7 @@ func (n *NodeProcess) Run(ctx context.Context) error { n.client.Close() // The workers get 10 seconds to finish up. - err := n.workers.Wait(time.Second * 10) - if err != nil { - return err - } + n.workers.Wait(time.Second * 10) return nil case <-ticker.C: @@ -157,7 +154,7 @@ func (n *NodeProcess) sync(ctx context.Context) { // Node data has been updated. Send back to server for database update. var derr error - n.resources, derr = detectResources(n.conf.Node, n.conf.Worker.WorkDir) + *n.resources, derr = detectResources(n.conf.Node, n.conf.Worker.WorkDir) if derr != nil { n.log.Error("error detecting resources", "error", derr) } diff --git a/compute/scheduler/predicates_test.go b/compute/scheduler/predicates_test.go index b67a8353f..8d3a34f1d 100644 --- a/compute/scheduler/predicates_test.go +++ b/compute/scheduler/predicates_test.go @@ -8,12 +8,7 @@ import ( ) func TestZonesFitEmptyTask(t *testing.T) { - j := &tes.Task{} - w := &Node{} - err := ZonesFit(j, w) - if err != nil { - t.Error(err) - } + testEmptyTask(t, ZonesFit, "ZonesFit") } func TestResourcesFitEmptyTask(t *testing.T) { @@ -70,10 +65,5 @@ func testEmptyTask(t *testing.T, p Predicate, name string) { j := &tes.Task{} w := &Node{} - err := p(j, w) - if err == nil { - t.Errorf("Expected predicate '%s' to fail", name) - } else { - t.Logf("Predicate '%s' error: %s", name, err) - } + p(j, w) } diff --git a/compute/scheduler/scheduler.go b/compute/scheduler/scheduler.go index 3b34ef1b7..2d9d93190 100644 --- a/compute/scheduler/scheduler.go +++ b/compute/scheduler/scheduler.go @@ -63,17 +63,11 @@ func (s *Scheduler) CheckNodes() error { if node.State == NodeState_GONE { for _, tid := range node.TaskIds { - err = s.Event.WriteEvent(ctx, events.NewState(tid, tes.State_SYSTEM_ERROR)) - if err != nil { - return err - } - err = s.Event.WriteEvent(ctx, events.NewSystemLog(tid, 0, 0, "info", + s.Event.WriteEvent(ctx, events.NewState(tid, tes.State_SYSTEM_ERROR)) + s.Event.WriteEvent(ctx, events.NewSystemLog(tid, 0, 0, "info", "Cleaning up Task assigned to dead/gone node", map[string]string{ "nodeID": node.Id, })) - if err != nil { - return err - } } _, err = s.Nodes.DeleteNode(ctx, node) } else { @@ -106,13 +100,10 @@ func (s *Scheduler) Schedule(ctx context.Context) error { "nodeID", offer.Node.Id, "node", offer.Node, ) - err = s.Event.WriteEvent(ctx, events.NewSystemLog(task.Id, 0, 0, "info", + s.Event.WriteEvent(ctx, events.NewSystemLog(task.Id, 0, 0, "info", "Assigning task to node", map[string]string{ "nodeID": offer.Node.Id, })) - if err != nil { - return err - } // TODO this is important! write a test for this line. // when a task is assigned, its state is immediately Initializing @@ -125,15 +116,11 @@ func (s *Scheduler) Schedule(ctx context.Context) error { "taskID", task.Id, "nodeID", offer.Node.Id, ) - err := s.Event.WriteEvent(ctx, events.NewSystemLog(task.Id, 0, 0, "error", + s.Event.WriteEvent(ctx, events.NewSystemLog(task.Id, 0, 0, "error", "Error in AssignTask", map[string]string{ "error": err.Error(), "nodeID": offer.Node.Id, })) - - if err != nil { - return err - } continue } diff --git a/compute/scheduler/testutils_test.go b/compute/scheduler/testutils_test.go index 27e3fa2f3..6be88cd4c 100644 --- a/compute/scheduler/testutils_test.go +++ b/compute/scheduler/testutils_test.go @@ -32,7 +32,7 @@ func newTestNode(conf config.Config, t *testing.T) testNode { conf: conf, client: s, log: log, - resources: res, + resources: &res, workerRun: NoopWorker, workers: newRunSet(), timeout: util.NewIdleTimeout(time.Duration(conf.Node.Timeout)), @@ -57,10 +57,7 @@ func (t *testNode) Start() context.CancelFunc { t.Client.On("GetNode", mock.Anything, mock.Anything, mock.Anything). Return(&Node{}, nil) go func() { - err := t.NodeProcess.Run(ctx) - if err != nil { - return - } + t.NodeProcess.Run(ctx) close(t.done) }() return cancel @@ -87,7 +84,7 @@ func timeLimit(t *testing.T, d time.Duration) func() { go func() { select { case <-time.NewTimer(d).C: - t.Error("time limit expired") + t.Fatal("time limit expired") case <-stop: } }() diff --git a/compute/scheduler/util.go b/compute/scheduler/util.go index 82cea29d1..5e99b5e20 100644 --- a/compute/scheduler/util.go +++ b/compute/scheduler/util.go @@ -23,8 +23,7 @@ func GenNodeID() string { // // Upon error, detectResources will return the resources given by the config // with the error. -func detectResources(conf config.Node, workdir string) (*Resources, error) { - fmt.Println("detectResources") +func detectResources(conf config.Node, workdir string) (Resources, error) { res := Resources{ Cpus: conf.Resources.Cpus, RamGb: conf.Resources.RamGb, @@ -33,15 +32,15 @@ func detectResources(conf config.Node, workdir string) (*Resources, error) { cpuinfo, err := pscpu.Info() if err != nil { - return &res, fmt.Errorf("Error detecting cpu cores: %s", err) + return res, fmt.Errorf("Error detecting cpu cores: %s", err) } vmeminfo, err := psmem.VirtualMemory() if err != nil { - return &res, fmt.Errorf("Error detecting memory: %s", err) + return res, fmt.Errorf("Error detecting memory: %s", err) } diskinfo, err := psdisk.Usage(workdir) if err != nil { - return &res, fmt.Errorf("Error detecting available disk: %s", err) + return res, fmt.Errorf("Error detecting available disk: %s", err) } if conf.Resources.Cpus == 0 { @@ -62,5 +61,5 @@ func detectResources(conf config.Node, workdir string) (*Resources, error) { res.DiskGb = float64(diskinfo.Free) / float64(gb) } - return &res, nil + return res, nil } diff --git a/config/config_test.go b/config/config_test.go index a20a6d3bc..449543ef0 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -11,10 +11,7 @@ Node: DiskGb: 50.0 ` conf := Config{} - err := Parse([]byte(yaml), &conf) - if err != nil { - t.Error(err) - } + Parse([]byte(yaml), &conf) if conf.Node.Resources.Cpus != 42 { t.Fatal("unexpected cpus") diff --git a/config/gce/meta_test.go b/config/gce/meta_test.go index 98dd30afa..4d7fc2b9b 100644 --- a/config/gce/meta_test.go +++ b/config/gce/meta_test.go @@ -40,10 +40,7 @@ func TestGetMetadata(t *testing.T) { if v, ok := r.URL.Query()["recursive"]; !ok || v[0] != "true" { t.Fatal("Expected recursive query") } - _, err := w.Write(loadTestData("metadata1")) - if err != nil { - t.Fatal(err) - } + w.Write(loadTestData("metadata1")) }) defer ts.Close() diff --git a/database/boltdb/events.go b/database/boltdb/events.go index 74c6fa4ed..d06cccd77 100644 --- a/database/boltdb/events.go +++ b/database/boltdb/events.go @@ -36,14 +36,8 @@ func (taskBolt *BoltDB) WriteEvent(ctx context.Context, req *events.Event) error return err } err = taskBolt.db.Update(func(tx *bolt.Tx) error { - err := tx.Bucket(TaskBucket).Put(idBytes, taskString) - if err != nil { - return err - } - err = tx.Bucket(TaskState).Put(idBytes, []byte(tes.State_QUEUED.String())) - if err != nil { - return err - } + tx.Bucket(TaskBucket).Put(idBytes, taskString) + tx.Bucket(TaskState).Put(idBytes, []byte(tes.State_QUEUED.String())) return nil }) if err != nil { @@ -148,11 +142,8 @@ func (taskBolt *BoltDB) WriteEvent(ctx context.Context, req *events.Event) error return err } - _ = taskBolt.db.Update(func(tx *bolt.Tx) error { - err := tx.Bucket(SysLogs).Put(idBytes, logbytes) - if err != nil { - return err - } + err = taskBolt.db.Update(func(tx *bolt.Tx) error { + tx.Bucket(SysLogs).Put(idBytes, logbytes) return nil }) } @@ -188,28 +179,19 @@ func transitionTaskState(tx *bolt.Tx, id string, target tes.State) error { case Canceled, Complete, ExecutorError, SystemError: // Remove from queue - err := tx.Bucket(TasksQueued).Delete(idBytes) - if err != nil { - return err - } + tx.Bucket(TasksQueued).Delete(idBytes) case Running, Initializing: if current != Unknown && current != Queued && current != Initializing { return fmt.Errorf("Unexpected transition from %s to %s", current.String(), target.String()) } - err := tx.Bucket(TasksQueued).Delete(idBytes) - if err != nil { - return err - } + tx.Bucket(TasksQueued).Delete(idBytes) default: return fmt.Errorf("Unknown target state: %s", target.String()) } - err := tx.Bucket(TaskState).Put(idBytes, []byte(target.String())) - if err != nil { - return err - } + tx.Bucket(TaskState).Put(idBytes, []byte(target.String())) return nil } diff --git a/database/boltdb/new.go b/database/boltdb/new.go index 4bf5fe9fd..105520390 100644 --- a/database/boltdb/new.go +++ b/database/boltdb/new.go @@ -40,8 +40,7 @@ var ExecutorStderr = []byte("executor-stderr") var Nodes = []byte("nodes") // SysLogs defeines the name of a bucket with maps -// -// task ID -> tes.TaskLog.SystemLogs +// task ID -> tes.TaskLog.SystemLogs var SysLogs = []byte("system-logs") // BoltDB provides handlers for gRPC endpoints. @@ -54,10 +53,7 @@ type BoltDB struct { // NewBoltDB returns a new instance of BoltDB, accessing the database at // the given path, and including the given ServerConfig. func NewBoltDB(conf config.BoltDB) (*BoltDB, error) { - err := fsutil.EnsurePath(conf.Path) - if err != nil { - return nil, err - } + fsutil.EnsurePath(conf.Path) db, err := bolt.Open(conf.Path, 0600, &bolt.Options{ Timeout: time.Second * 5, }) @@ -72,58 +68,31 @@ func (taskBolt *BoltDB) Init() error { // Check to make sure all the required buckets have been created return taskBolt.db.Update(func(tx *bolt.Tx) error { if tx.Bucket(TaskBucket) == nil { - _, err := tx.CreateBucket(TaskBucket) - if err != nil { - return err - } + tx.CreateBucket(TaskBucket) } if tx.Bucket(TasksQueued) == nil { - _, err := tx.CreateBucket(TasksQueued) - if err != nil { - return err - } + tx.CreateBucket(TasksQueued) } if tx.Bucket(TaskState) == nil { - _, err := tx.CreateBucket(TaskState) - if err != nil { - return err - } + tx.CreateBucket(TaskState) } if tx.Bucket(TasksLog) == nil { - _, err := tx.CreateBucket(TasksLog) - if err != nil { - return err - } + tx.CreateBucket(TasksLog) } if tx.Bucket(ExecutorLogs) == nil { - _, err := tx.CreateBucket(ExecutorLogs) - if err != nil { - return err - } + tx.CreateBucket(ExecutorLogs) } if tx.Bucket(ExecutorStdout) == nil { - _, err := tx.CreateBucket(ExecutorStdout) - if err != nil { - return err - } + tx.CreateBucket(ExecutorStdout) } if tx.Bucket(ExecutorStderr) == nil { - _, err := tx.CreateBucket(ExecutorStderr) - if err != nil { - return err - } + tx.CreateBucket(ExecutorStderr) } if tx.Bucket(Nodes) == nil { - _, err := tx.CreateBucket(Nodes) - if err != nil { - return err - } + tx.CreateBucket(Nodes) } if tx.Bucket(SysLogs) == nil { - _, err := tx.CreateBucket(SysLogs) - if err != nil { - return err - } + tx.CreateBucket(SysLogs) } return nil }) diff --git a/database/boltdb/scheduler.go b/database/boltdb/scheduler.go index 98d2070cc..f8245a714 100644 --- a/database/boltdb/scheduler.go +++ b/database/boltdb/scheduler.go @@ -17,23 +17,20 @@ func (taskBolt *BoltDB) queueTask(task *tes.Task) error { taskID := task.Id idBytes := []byte(taskID) - err := taskBolt.db.Update(func(tx *bolt.Tx) error { + taskBolt.db.Update(func(tx *bolt.Tx) error { err := tx.Bucket(TasksQueued).Put(idBytes, []byte{}) if err != nil { return err } return nil }) - if err != nil { - return fmt.Errorf("can't queue task: %s", err) - } return nil } // ReadQueue returns a slice of queued Tasks. Up to "n" tasks are returned. func (taskBolt *BoltDB) ReadQueue(n int) []*tes.Task { tasks := make([]*tes.Task, 0) - err := taskBolt.db.View(func(tx *bolt.Tx) error { + taskBolt.db.View(func(tx *bolt.Tx) error { // Iterate over the TasksQueued bucket, reading the first `n` tasks c := tx.Bucket(TasksQueued).Cursor() for k, _ := c.First(); k != nil && len(tasks) < n; k, _ = c.Next() { @@ -43,9 +40,6 @@ func (taskBolt *BoltDB) ReadQueue(n int) []*tes.Task { } return nil }) - if err != nil { - return nil - } return tasks } @@ -59,10 +53,7 @@ func (taskBolt *BoltDB) PutNode(ctx context.Context, node *scheduler.Node) (*sch existing := &scheduler.Node{} data := tx.Bucket(Nodes).Get([]byte(node.Id)) if data != nil { - err := proto.Unmarshal(data, existing) - if err != nil { - return err - } + proto.Unmarshal(data, existing) } if existing.GetVersion() != 0 && node.Version != existing.GetVersion() { diff --git a/database/boltdb/tes.go b/database/boltdb/tes.go index 92e8403cf..4b4c43099 100644 --- a/database/boltdb/tes.go +++ b/database/boltdb/tes.go @@ -37,10 +37,7 @@ func loadBasicTaskView(tx *bolt.Tx, id string, task *tes.Task) error { if b == nil { return tes.ErrNotFound } - err := proto.Unmarshal(b, task) - if err != nil { - return err - } + proto.Unmarshal(b, task) loadTaskLogs(tx, task) // remove content from inputs @@ -59,10 +56,7 @@ func loadFullTaskView(tx *bolt.Tx, id string, task *tes.Task) error { if b == nil { return tes.ErrNotFound } - err := proto.Unmarshal(b, task) - if err != nil { - return err - } + proto.Unmarshal(b, task) loadTaskLogs(tx, task) // Load executor stdout/err @@ -102,20 +96,14 @@ func loadTaskLogs(tx *bolt.Tx, task *tes.Task) { b := tx.Bucket(TasksLog).Get([]byte(task.Id)) if b != nil { - err := proto.Unmarshal(b, tasklog) - if err != nil { - return - } + proto.Unmarshal(b, tasklog) } for i := range task.Executors { o := tx.Bucket(ExecutorLogs).Get([]byte(fmt.Sprint(task.Id, i))) if o != nil { var execlog tes.ExecutorLog - err := proto.Unmarshal(o, &execlog) - if err != nil { - return - } + proto.Unmarshal(o, &execlog) tasklog.Logs = append(tasklog.Logs, &execlog) } } @@ -160,7 +148,7 @@ func (taskBolt *BoltDB) ListTasks(ctx context.Context, req *tes.ListTasksRequest } pageSize := tes.GetPageSize(req.GetPageSize()) - err := taskBolt.db.View(func(tx *bolt.Tx) error { + taskBolt.db.View(func(tx *bolt.Tx) error { c := tx.Bucket(TaskBucket).Cursor() i := 0 @@ -206,9 +194,6 @@ func (taskBolt *BoltDB) ListTasks(ctx context.Context, req *tes.ListTasksRequest } return nil }) - if err != nil { - return nil, err - } out := tes.ListTasksResponse{ Tasks: tasks, diff --git a/database/elastic/scheduler.go b/database/elastic/scheduler.go index 24fee1136..5a5791464 100644 --- a/database/elastic/scheduler.go +++ b/database/elastic/scheduler.go @@ -94,10 +94,7 @@ func (es *Elastic) PutNode(ctx context.Context, node *scheduler.Node) (*schedule existing := &scheduler.Node{} if err == nil { - err := jsonpb.Unmarshal(bytes.NewReader(*res.Source), existing) - if err != nil { - return nil, err - } + jsonpb.Unmarshal(bytes.NewReader(*res.Source), existing) } err = scheduler.UpdateNode(ctx, es, node, existing) diff --git a/database/mongodb/events.go b/database/mongodb/events.go index 03fccbe49..f0b91b614 100644 --- a/database/mongodb/events.go +++ b/database/mongodb/events.go @@ -14,7 +14,7 @@ import ( // WriteEvent creates an event for the server to handle. func (db *MongoDB) WriteEvent(ctx context.Context, req *events.Event) error { - sess := db.sess.Copy() // Panics when database connection is closed + sess := db.sess.Copy() defer sess.Close() tasks := db.tasks(sess) diff --git a/database/mongodb/new.go b/database/mongodb/new.go index 3e857d11c..efa52aff4 100644 --- a/database/mongodb/new.go +++ b/database/mongodb/new.go @@ -113,8 +113,6 @@ func (db *MongoDB) Init() error { // Close closes the database session. func (db *MongoDB) Close() { - // TODO: Print stack info for calling function - // debug.PrintStack() if db.active { db.sess.Close() } diff --git a/events/executor.go b/events/executor.go index 7d231e09b..8998f613c 100644 --- a/events/executor.go +++ b/events/executor.go @@ -162,14 +162,8 @@ func LogTail(ctx context.Context, taskID string, attempt, index uint32, size int go func() { <-ctx.Done() - err := flush(stdoutbuf, Type_EXECUTOR_STDOUT) - if err != nil { - return - } - err = flush(stdoutbuf, Type_EXECUTOR_STDERR) - if err != nil { - return - } + flush(stdoutbuf, Type_EXECUTOR_STDOUT) + flush(stdoutbuf, Type_EXECUTOR_STDERR) }() return stdoutbuf, stderrbuf @@ -242,11 +236,8 @@ func StreamLogTail(ctx context.Context, taskID string, attempt, index uint32, si for e := range eventch { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) - err := out.WriteEvent(ctx, e) + out.WriteEvent(ctx, e) cancel() - if err != nil { - return - } } }() @@ -271,18 +262,12 @@ func StreamLogTail(ctx context.Context, taskID string, attempt, index uint32, si flushboth(immediate) } case b := <-stdoutch: - _, err := stdoutbuf.Write(b) - if err != nil { - return - } + stdoutbuf.Write(b) if limiter.Allow() { flushboth(immediate) } case b := <-stderrch: - _, err := stderrbuf.Write(b) - if err != nil { - return - } + stderrbuf.Write(b) if limiter.Allow() { flushboth(immediate) } diff --git a/events/kafka.go b/events/kafka.go index 27fe5856e..52768142b 100644 --- a/events/kafka.go +++ b/events/kafka.go @@ -82,10 +82,7 @@ func NewKafkaReader(ctx context.Context, conf config.Kafka, w Writer) (*KafkaRea // TODO continue } - err = w.WriteEvent(context.Background(), ev) - if err != nil { - return - } + w.WriteEvent(context.Background(), ev) } }() diff --git a/events/pubsub.go b/events/pubsub.go index 0a5d824bb..952007d70 100644 --- a/events/pubsub.go +++ b/events/pubsub.go @@ -97,21 +97,15 @@ func ReadPubSub(ctx context.Context, conf config.PubSub, subname string, w Write } } - err = sub.Receive(ctx, func(ctx oldctx.Context, m *pubsub.Message) { + sub.Receive(ctx, func(ctx oldctx.Context, m *pubsub.Message) { ev := &Event{} - err := Unmarshal(m.Data, ev) - if err != nil { - return - } - err = w.WriteEvent(context.Background(), ev) + Unmarshal(m.Data, ev) if err != nil { return } + w.WriteEvent(context.Background(), ev) m.Ack() }) - if err != nil { - return err - } return nil } diff --git a/logger/text_formatter_test.go b/logger/text_formatter_test.go index 85e20aa5b..956837714 100644 --- a/logger/text_formatter_test.go +++ b/logger/text_formatter_test.go @@ -25,8 +25,5 @@ func TestFormatNilProtoField(t *testing.T) { "ns": "TEST", "nil value": nt, }) - _, err := tf.Format(entry) - if err != nil { - t.Error(err) - } + tf.Format(entry) } diff --git a/server/server.go b/server/server.go index cbb322b94..a52c0e3ec 100644 --- a/server/server.go +++ b/server/server.go @@ -176,10 +176,7 @@ func (s *Server) Serve(pctx context.Context) error { <-ctx.Done() grpcServer.GracefulStop() - err = httpServer.Shutdown(context.TODO()) - if err != nil { - return err - } + httpServer.Shutdown(context.TODO()) return srverr } diff --git a/storage/ftp.go b/storage/ftp.go index 72966875b..e847b35bd 100644 --- a/storage/ftp.go +++ b/storage/ftp.go @@ -135,15 +135,8 @@ func connect(url string, conf config.FTPStorage) (*ftpclient, error) { } func (b *ftpclient) Close() { - err := b.client.Logout() - if err != nil { - return - } - - err = b.client.Quit() - if err != nil { - return - } + b.client.Logout() + b.client.Quit() } // Stat returns information about the object at the given storage URL. diff --git a/storage/http.go b/storage/http.go index 791777aed..20fe5befa 100644 --- a/storage/http.go +++ b/storage/http.go @@ -38,7 +38,7 @@ func (b *HTTP) Stat(ctx context.Context, url string) (*Object, error) { if err != nil { return nil, fmt.Errorf("httpStorage: creating HEAD request: %s", err) } - _ = req.WithContext(ctx) + req.WithContext(ctx) resp, err := b.client.Do(req) if err != nil { @@ -71,7 +71,7 @@ func (b *HTTP) Get(ctx context.Context, url, path string) (*Object, error) { if err != nil { return nil, fmt.Errorf("httpStorage: creating GET request: %s", err) } - _ = req.WithContext(ctx) + req.WithContext(ctx) resp, err := b.client.Do(req) if err != nil { diff --git a/storage/local_test.go b/storage/local_test.go index 50cfaf796..587b6443e 100644 --- a/storage/local_test.go +++ b/storage/local_test.go @@ -37,10 +37,7 @@ func TestLocalGet(t *testing.T) { // File test ip := path.Join(tmp, "input.txt") cp := path.Join(tmp, "container.txt") - err = ioutil.WriteFile(ip, []byte("foo"), os.ModePerm) - if err != nil { - t.Fatal(err) - } + ioutil.WriteFile(ip, []byte("foo"), os.ModePerm) _, gerr := l.Get(ctx, "file://"+ip, cp) if gerr != nil { @@ -166,10 +163,7 @@ func TestLocalGetPath(t *testing.T) { ip := path.Join(tmp, "input.txt") cp := path.Join(tmp, "container.txt") - err = ioutil.WriteFile(ip, []byte("foo"), os.ModePerm) - if err != nil { - t.Fatal(err) - } + ioutil.WriteFile(ip, []byte("foo"), os.ModePerm) _, gerr := l.Get(ctx, ip, cp) if gerr != nil { @@ -197,10 +191,7 @@ func TestLocalPut(t *testing.T) { // File test cp := path.Join(tmp, "container.txt") op := path.Join(tmp, "output.txt") - err = ioutil.WriteFile(cp, []byte("foo"), os.ModePerm) - if err != nil { - t.Fatal(err) - } + ioutil.WriteFile(cp, []byte("foo"), os.ModePerm) _, gerr := l.Put(ctx, "file://"+op, cp) if gerr != nil { @@ -227,10 +218,7 @@ func TestLocalPutPath(t *testing.T) { cp := path.Join(tmp, "container.txt") op := path.Join(tmp, "output.txt") - err = ioutil.WriteFile(cp, []byte("foo"), os.ModePerm) - if err != nil { - t.Fatal(err) - } + ioutil.WriteFile(cp, []byte("foo"), os.ModePerm) _, gerr := l.Put(ctx, op, cp) if gerr != nil { @@ -264,14 +252,8 @@ func TestSameFile(t *testing.T) { cp := path.Join(tmp, "output.txt") cp2 := path.Join(tmp, "output2.txt") op := path.Join(tmpOut, "output.txt") - err = ioutil.WriteFile(cp, []byte("foo"), os.ModePerm) - if err != nil { - t.Fatal(err) - } - err = ioutil.WriteFile(cp2, []byte("bar"), os.ModePerm) - if err != nil { - t.Fatal(err) - } + ioutil.WriteFile(cp, []byte("foo"), os.ModePerm) + ioutil.WriteFile(cp2, []byte("bar"), os.ModePerm) err = linkFile(ctx, cp, op) if err != nil { diff --git a/storage/storage_test.go b/storage/storage_test.go index 9bb4952e2..8af61be10 100644 --- a/storage/storage_test.go +++ b/storage/storage_test.go @@ -118,12 +118,12 @@ func TestUrlParsing(t *testing.T) { t.Error("wrong key") } - _, err = b.parse("gs://1000genomes/README.analysis_history") + url, err = b.parse("gs://1000genomes/README.analysis_history") if _, ok := err.(*ErrUnsupportedProtocol); !ok { t.Error("expected ErrUnsupportedProtocol") } - _, err = b.parse("s3://") + url, err = b.parse("s3://") if _, ok := err.(*ErrInvalidURL); !ok { t.Error("expected ErrInvalidURL") } @@ -179,12 +179,12 @@ func TestUrlParsing(t *testing.T) { t.Error("wrong key") } - _, _, err = ab.parse("gs://1000genomes/README.analysis_history") + url, _, err = ab.parse("gs://1000genomes/README.analysis_history") if _, ok := err.(*ErrUnsupportedProtocol); !ok { t.Error("expected ErrUnsupportedProtocol") } - _, _, err = ab.parse("s3://") + url, _, err = ab.parse("s3://") if _, ok := err.(*ErrInvalidURL); !ok { t.Error("expected ErrInvalidURL") } @@ -210,12 +210,12 @@ func TestUrlParsing(t *testing.T) { t.Error("wrong key") } - _, err = gb.parse("s3://1000genomes/README.analysis_history") + url, err = gb.parse("s3://1000genomes/README.analysis_history") if _, ok := err.(*ErrUnsupportedProtocol); !ok { t.Error("expected ErrUnsupportedProtocol") } - _, err = gb.parse("gs://") + url, err = gb.parse("gs://") if _, ok := err.(*ErrInvalidURL); !ok { t.Error("expected ErrInvalidURL") } @@ -238,12 +238,12 @@ func TestUrlParsing(t *testing.T) { t.Error("wrong key") } - _, err = sb.parse("s3://1000genomes/README.analysis_history") + url, err = sb.parse("s3://1000genomes/README.analysis_history") if _, ok := err.(*ErrUnsupportedProtocol); !ok { t.Error("expected ErrUnsupportedProtocol") } - _, err = sb.parse("swift://") + url, err = sb.parse("swift://") if _, ok := err.(*ErrInvalidURL); !ok { t.Error("expected ErrInvalidURL") } diff --git a/tes/client.go b/tes/client.go index f2c7fe9f2..391c3b04a 100644 --- a/tes/client.go +++ b/tes/client.go @@ -60,7 +60,7 @@ func (c *Client) GetTask(ctx context.Context, req *GetTaskRequest) (*Task, error // Send request u := c.address + "/v1/tasks/" + req.Id + "?view=" + req.View hreq, _ := http.NewRequest("GET", u, nil) - _ = hreq.WithContext(ctx) + hreq.WithContext(ctx) hreq.SetBasicAuth(c.User, c.Password) body, err := util.CheckHTTPResponse(c.client.Do(hreq)) if err != nil { @@ -96,8 +96,7 @@ func (c *Client) ListTasks(ctx context.Context, req *ListTasksRequest) (*ListTas // Send request u := c.address + "/v1/tasks?" + v.Encode() hreq, _ := http.NewRequest("GET", u, nil) - hreq.SetBasicAuth(c.User, c.Password) - // hreq.WithContext(ctx) + hreq.WithContext(ctx) hreq.SetBasicAuth(c.User, c.Password) body, err := util.CheckHTTPResponse(c.client.Do(hreq)) if err != nil { @@ -148,7 +147,7 @@ func (c *Client) CreateTask(ctx context.Context, task *Task) (*CreateTaskRespons func (c *Client) CancelTask(ctx context.Context, req *CancelTaskRequest) (*CancelTaskResponse, error) { u := c.address + "/v1/tasks/" + req.Id + ":cancel" hreq, _ := http.NewRequest("POST", u, nil) - // hreq.WithContext(ctx) + hreq.WithContext(ctx) hreq.Header.Add("Content-Type", "application/json") hreq.SetBasicAuth(c.User, c.Password) body, err := util.CheckHTTPResponse(c.client.Do(hreq)) @@ -169,7 +168,7 @@ func (c *Client) CancelTask(ctx context.Context, req *CancelTaskRequest) (*Cance func (c *Client) GetServiceInfo(ctx context.Context, req *GetServiceInfoRequest) (*ServiceInfo, error) { u := c.address + "/v1/service-info" hreq, _ := http.NewRequest("GET", u, nil) - // hreq.WithContext(ctx) + hreq.WithContext(ctx) hreq.SetBasicAuth(c.User, c.Password) body, err := util.CheckHTTPResponse(c.client.Do(hreq)) if err != nil { diff --git a/tes/urlvalues.go b/tes/urlvalues.go index 44b3e9830..b609857ed 100644 --- a/tes/urlvalues.go +++ b/tes/urlvalues.go @@ -11,11 +11,11 @@ func addString(u url.Values, key, value string) { } } -// func addUInt32(u url.Values, key string, value uint32) { -// if value != 0 { -// u.Add(key, fmt.Sprint(value)) -// } -// } +func addUInt32(u url.Values, key string, value uint32) { + if value != 0 { + u.Add(key, fmt.Sprint(value)) + } +} func addInt32(u url.Values, key string, value int32) { if value != 0 { diff --git a/tes/utils.go b/tes/utils.go index 5288e99ee..5cd2b9caf 100644 --- a/tes/utils.go +++ b/tes/utils.go @@ -107,10 +107,7 @@ func TerminalState(s State) bool { // GetBasicView returns the basic view of a task. func (task *Task) GetBasicView() *Task { view := &Task{} - err := deepcopy.Copy(view, task) - if err != nil { - return nil - } + deepcopy.Copy(view, task) // remove contents from inputs for _, v := range view.Inputs { diff --git a/tests/config_utils.go b/tests/config_utils.go index 45e882481..0cceb3273 100644 --- a/tests/config_utils.go +++ b/tests/config_utils.go @@ -72,10 +72,7 @@ func TestifyConfig(conf config.Config) config.Config { storageDir, _ := ioutil.TempDir("./test_tmp", "funnel-test-storage-") wd, _ := os.Getwd() - err := fsutil.EnsureDir(storageDir) - if err != nil { - panic(err) - } + fsutil.EnsureDir(storageDir) conf.LocalStorage = config.LocalStorage{ AllowedDirs: []string{storageDir, wd}, @@ -102,10 +99,7 @@ func RandomPortConfig(conf config.Config) config.Config { // TempDirConfig returns a modified config with workdir and db path set to a temp. directory. func TempDirConfig(conf config.Config) config.Config { - err := os.MkdirAll("./test_tmp", os.ModePerm) - if err != nil { - panic(err) - } + os.MkdirAll("./test_tmp", os.ModePerm) f, _ := ioutil.TempDir("./test_tmp", "funnel-test-") conf.Worker.WorkDir = f conf.BoltDB.Path = path.Join(f, "funnel.db") diff --git a/tests/core/basic_test.go b/tests/core/basic_test.go index b4e424330..7ab7a0b28 100644 --- a/tests/core/basic_test.go +++ b/tests/core/basic_test.go @@ -281,7 +281,6 @@ func TestGetTaskView(t *testing.T) { } // TODO this is a bit hacky for now because we're reusing the same -// // server + DB for all the e2e tests, so ListTasks gets the // results of all of those. It works for the moment, but // should probably run against a clean environment. @@ -530,10 +529,7 @@ func TestSingleCharLog(t *testing.T) { if task.Logs[0].Logs[0].Stdout != "a\n" { t.Fatal("Missing logs") } - err := fun.Cancel(id) - if err != nil { - t.Fatal(err) - } + fun.Cancel(id) } // Test that a completed task cannot change state. @@ -562,10 +558,7 @@ func TestCancel(t *testing.T) { --sh 'echo never' `) fun.WaitForExec(id, 1) - err := fun.Cancel(id) - if err != nil { - t.Fatal(err) - } + fun.Cancel(id) fun.WaitForDockerDestroy(id + "-0") task := fun.Get(id) if task.State != tes.State_CANCELED { @@ -607,10 +600,7 @@ func TestExecutorLogLength(t *testing.T) { `) fun.WaitForExec(id, 2) task := fun.Get(id) - err := fun.Cancel(id) - if err != nil { - t.Fatal(err) - } + fun.Cancel(id) if len(task.Logs[0].Logs) != 2 { t.Fatal("Unexpected executor log count") } @@ -631,10 +621,7 @@ func TestMarkCompleteBug(t *testing.T) { if task.State != tes.State_RUNNING { t.Fatal("Unexpected task state") } - err := fun.Cancel(id) - if err != nil { - t.Error(err) - } + fun.Cancel(id) } func TestTaskStartEndTimeLogs(t *testing.T) { diff --git a/tests/core/worker_test.go b/tests/core/worker_test.go index 090da0beb..1290b142a 100644 --- a/tests/core/worker_test.go +++ b/tests/core/worker_test.go @@ -184,10 +184,7 @@ func TestLargeLogRate(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() - err := w.Run(ctx) - if err != nil { - t.Log(err) - } + w.Run(ctx) // Given the difficulty of timing how long it task a task + docker container to start, // we just check that a small amount of events were generated. @@ -226,10 +223,7 @@ func TestZeroLogRate(t *testing.T) { EventWriter: m, } - err := w.Run(context.Background()) - if err != nil { - t.Log(err) - } + w.Run(context.Background()) time.Sleep(time.Second) diff --git a/tests/funnel_utils.go b/tests/funnel_utils.go index 5ada6884a..9f03d82a6 100644 --- a/tests/funnel_utils.go +++ b/tests/funnel_utils.go @@ -111,10 +111,7 @@ func (f *Funnel) Cleanup() { // StartServer starts the server func (f *Funnel) StartServer() { go func() { - err := f.Srv.Run(context.Background()) - if err != nil { - return - } + f.Srv.Run(context.Background()) }() err := f.PollForServerStart() @@ -332,7 +329,7 @@ func (f *Funnel) StartServerInDocker(containerName, imageName string, extraArgs gopath := os.Getenv("GOPATH") if gopath == "" { - funnelBinary, _ = filepath.Abs("../../funnel") + funnelBinary, err = filepath.Abs("../../funnel") } else { if runtime.GOOS == "linux" { funnelBinary, err = filepath.Abs(filepath.Join( @@ -351,14 +348,8 @@ func (f *Funnel) StartServerInDocker(containerName, imageName string, extraArgs // write config file configPath, _ := filepath.Abs(filepath.Join(f.Conf.Worker.WorkDir, "config.yml")) - err = config.ToYamlFile(f.Conf, configPath) - if err != nil { - return - } - err = os.Chmod(configPath, 0644) - if err != nil { - return - } + config.ToYamlFile(f.Conf, configPath) + os.Chmod(configPath, 0644) httpPort, _ := strconv.ParseInt(f.Conf.Server.HTTPPort, 0, 32) rpcPort, _ := strconv.ParseInt(f.Conf.Server.RPCPort, 0, 32) @@ -366,7 +357,7 @@ func (f *Funnel) StartServerInDocker(containerName, imageName string, extraArgs // detect gid of /var/run/docker.sock fi, err := os.Stat("/var/run/docker.sock") if err != nil { - return + panic(err) } gid := fi.Sys().(*syscall.Stat_t).Gid @@ -384,10 +375,13 @@ func (f *Funnel) StartServerInDocker(containerName, imageName string, extraArgs args = append(args, extraArgs...) args = append(args, imageName, "funnel", "server", "run", "--config", "/opt/funnel_config.yml") + log.Info("DEBUG: args: ", args) cmd := exec.Command("docker", args...) cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr - log.Info("Running command", "cmd", "docker "+strings.Join(args, " ")) + log.Info("Running command!", "cmd", "docker "+strings.Join(args, " ")) + + // /test_tmp/funnel-test-1186572504/cj1diie9qp6c7392e3f0/tmp // start server done := make(chan error, 1) @@ -414,24 +408,24 @@ func (f *Funnel) StartServerInDocker(containerName, imageName string, extraArgs case <-ready: break } + return } -// TODO: commenting out for now for linting check -// func (f *Funnel) findTestServerContainers() []string { -// res := []string{} -// containers, err := f.Docker.ContainerList(context.Background(), dockerTypes.ContainerListOptions{}) -// if err != nil { -// panic(err) -// } -// for _, c := range containers { -// for _, n := range c.Names { -// if strings.Contains(n, "funnel-test-server-") { -// res = append(res, n) -// } -// } -// } -// return res -// } +func (f *Funnel) findTestServerContainers() []string { + res := []string{} + containers, err := f.Docker.ContainerList(context.Background(), dockerTypes.ContainerListOptions{}) + if err != nil { + panic(err) + } + for _, c := range containers { + for _, n := range c.Names { + if strings.Contains(n, "funnel-test-server-") { + res = append(res, n) + } + } + } + return res +} func (f *Funnel) killTestServerContainers(ids []string) { for _, n := range ids { @@ -446,6 +440,7 @@ func (f *Funnel) killTestServerContainers(ids []string) { func (f *Funnel) CleanupTestServerContainer(containerName string) { f.Cleanup() f.killTestServerContainers([]string{containerName}) + return } // HelloWorld is a simple, valid task that is easy to reuse in tests. diff --git a/tests/gridengine/gridengine_test.go b/tests/gridengine/gridengine_test.go index f0eceb414..4d593a59d 100644 --- a/tests/gridengine/gridengine_test.go +++ b/tests/gridengine/gridengine_test.go @@ -35,6 +35,7 @@ func TestMain(m *testing.M) { }() exit = m.Run() + return } func TestHelloWorld(t *testing.T) { diff --git a/tests/pbs/pbs_test.go b/tests/pbs/pbs_test.go index 1b92d3d45..e9f800016 100644 --- a/tests/pbs/pbs_test.go +++ b/tests/pbs/pbs_test.go @@ -32,6 +32,7 @@ func TestMain(m *testing.M) { }() exit = m.Run() + return } func TestHelloWorld(t *testing.T) { diff --git a/tests/perf/perf_test.go b/tests/perf/perf_test.go index b090751b0..a5ce374a8 100644 --- a/tests/perf/perf_test.go +++ b/tests/perf/perf_test.go @@ -89,7 +89,7 @@ func BenchmarkRunConcurrentWithFakeNodes(b *testing.B) { for { select { case <-ticker.C: - _, err := cli.WriteEvent(context.Background(), &events.Event{ + cli.WriteEvent(context.Background(), &events.Event{ Id: id, Attempt: 0, Index: 0, @@ -99,9 +99,6 @@ func BenchmarkRunConcurrentWithFakeNodes(b *testing.B) { }, Timestamp: time.Now().Format(time.RFC3339Nano), }) - if err != nil { - return - } case <-done: return } diff --git a/tests/scheduler/node_health_test.go b/tests/scheduler/node_health_test.go index d6658a888..9874dc9f2 100644 --- a/tests/scheduler/node_health_test.go +++ b/tests/scheduler/node_health_test.go @@ -67,10 +67,7 @@ func TestNodeInitFail(t *testing.T) { } time.Sleep(time.Duration(conf.Scheduler.NodeInitTimeout)) - err = srv.Scheduler.CheckNodes() - if err != nil { - t.Error(err) - } + srv.Scheduler.CheckNodes() resp, err := srv.Scheduler.Nodes.ListNodes(ctx, &scheduler.ListNodesRequest{}) if err != nil { @@ -102,16 +99,10 @@ func TestNodeDeadTimeout(t *testing.T) { t.Error(err) } - err = srv.Scheduler.CheckNodes() - if err != nil { - t.Error(err) - } + srv.Scheduler.CheckNodes() time.Sleep(time.Duration(conf.Scheduler.NodeDeadTimeout)) - err = srv.Scheduler.CheckNodes() - if err != nil { - t.Error(err) - } + srv.Scheduler.CheckNodes() resp, err := srv.Scheduler.Nodes.ListNodes(ctx, &scheduler.ListNodesRequest{}) if err != nil { diff --git a/tests/scheduler/node_test.go b/tests/scheduler/node_test.go index 4264a5e23..7aee5a02e 100644 --- a/tests/scheduler/node_test.go +++ b/tests/scheduler/node_test.go @@ -37,17 +37,9 @@ func TestNodeGoneOnCanceledContext(t *testing.T) { } ctx, cancel := context.WithCancel(bg) defer cancel() - go func() { - err := n.Run(ctx) - if err != nil { - t.Error(err) - } - }() + go n.Run(ctx) - err = srv.Scheduler.CheckNodes() - if err != nil { - t.Fatal(err) - } + srv.Scheduler.CheckNodes() time.Sleep(time.Duration(conf.Node.UpdateRate * 2)) resp, err := srv.Scheduler.Nodes.ListNodes(bg, &scheduler.ListNodesRequest{}) @@ -62,10 +54,7 @@ func TestNodeGoneOnCanceledContext(t *testing.T) { cancel() time.Sleep(time.Duration(conf.Node.UpdateRate * 2)) - err = srv.Scheduler.CheckNodes() - if err != nil { - t.Error(err) - } + srv.Scheduler.CheckNodes() resp, err = srv.Scheduler.Nodes.ListNodes(bg, &scheduler.ListNodesRequest{}) if err != nil { @@ -110,13 +99,7 @@ func TestManualBackend(t *testing.T) { if err != nil { t.Fatal("failed to create node", err) } - - go func() { - err := n.Run(ctx) - if err != nil { - t.Error(err) - } - }() + go n.Run(ctx) // run tasks and check that they all complete tasks := []string{} @@ -200,86 +183,47 @@ func TestNodeCleanup(t *testing.T) { e := srv.Server.Events t1 := tests.HelloWorld() - _, err := srv.Server.Tasks.CreateTask(ctx, t1) - if err != nil { - t.Error(err) - } - _, err = e.WriteEvent(ctx, events.NewState(t1.Id, tes.Complete)) - if err != nil { - t.Error(err) - } + srv.Server.Tasks.CreateTask(ctx, t1) + e.WriteEvent(ctx, events.NewState(t1.Id, tes.Complete)) t2 := tests.HelloWorld() - _, err = srv.Server.Tasks.CreateTask(ctx, t2) - if err != nil { - t.Error(err) - } - _, err = e.WriteEvent(ctx, events.NewState(t2.Id, tes.Running)) - if err != nil { - t.Error(err) - } + srv.Server.Tasks.CreateTask(ctx, t2) + e.WriteEvent(ctx, events.NewState(t2.Id, tes.Running)) t3 := tests.HelloWorld() - _, err = srv.Server.Tasks.CreateTask(ctx, t3) - if err != nil { - t.Error(err) - } - _, err = e.WriteEvent(ctx, events.NewState(t3.Id, tes.SystemError)) - if err != nil { - t.Error(err) - } + srv.Server.Tasks.CreateTask(ctx, t3) + e.WriteEvent(ctx, events.NewState(t3.Id, tes.SystemError)) t4 := tests.HelloWorld() - _, err = srv.Server.Tasks.CreateTask(ctx, t4) - if err != nil { - t.Error(err) - } - _, err = e.WriteEvent(ctx, events.NewState(t4.Id, tes.Running)) - if err != nil { - t.Error(err) - } + srv.Server.Tasks.CreateTask(ctx, t4) + e.WriteEvent(ctx, events.NewState(t4.Id, tes.Running)) t5 := tests.HelloWorld() - _, err = srv.Server.Tasks.CreateTask(ctx, t5) - if err != nil { - t.Error(err) - } - _, err = e.WriteEvent(ctx, events.NewState(t5.Id, tes.Running)) - if err != nil { - t.Error(err) - } + srv.Server.Tasks.CreateTask(ctx, t5) + e.WriteEvent(ctx, events.NewState(t5.Id, tes.Running)) - _, err = srv.Scheduler.Nodes.PutNode(ctx, &scheduler.Node{ + srv.Scheduler.Nodes.PutNode(ctx, &scheduler.Node{ Id: "test-gone-node-cleanup-restart-1", State: scheduler.NodeState_GONE, TaskIds: []string{t1.Id, t2.Id, t3.Id}, }) - if err != nil { - t.Error(err) - } - _, err = srv.Scheduler.Nodes.PutNode(ctx, &scheduler.Node{ + srv.Scheduler.Nodes.PutNode(ctx, &scheduler.Node{ Id: "test-gone-node-cleanup-restart-2", State: scheduler.NodeState_GONE, TaskIds: []string{t4.Id}, }) - if err != nil { - t.Error(err) - } - _, err = srv.Scheduler.Nodes.PutNode(ctx, &scheduler.Node{ + srv.Scheduler.Nodes.PutNode(ctx, &scheduler.Node{ Id: "test-gone-node-cleanup-restart-3", State: scheduler.NodeState_ALIVE, TaskIds: []string{t5.Id}, }) - if err != nil { - t.Error(err) - } ns, _ := srv.Scheduler.Nodes.ListNodes(ctx, &scheduler.ListNodesRequest{}) log.Info("nodes before", ns) - err = srv.Scheduler.CheckNodes() + err := srv.Scheduler.CheckNodes() if err != nil { t.Error(err) } @@ -342,17 +286,9 @@ func TestNodeDrain(t *testing.T) { } ctx, cancel := context.WithCancel(bg) defer cancel() - go func() { - err := n.Run(ctx) - if err != nil { - t.Error(err) - } - }() + go n.Run(ctx) - err = srv.Scheduler.CheckNodes() - if err != nil { - t.Fatal(err) - } + srv.Scheduler.CheckNodes() time.Sleep(time.Duration(conf.Node.UpdateRate * 10)) resp, err := srv.Scheduler.Nodes.ListNodes(bg, &scheduler.ListNodesRequest{}) @@ -370,10 +306,7 @@ func TestNodeDrain(t *testing.T) { first := srv.Run("echo") time.Sleep(time.Duration(conf.Node.UpdateRate * 10)) - err = srv.Scheduler.CheckNodes() - if err != nil { - t.Fatal(err) - } + srv.Scheduler.CheckNodes() resp, err = srv.Scheduler.Nodes.ListNodes(bg, &scheduler.ListNodesRequest{}) if err != nil { t.Fatal(err) diff --git a/tests/scheduler/scheduler_test.go b/tests/scheduler/scheduler_test.go index eff226797..a5e645189 100644 --- a/tests/scheduler/scheduler_test.go +++ b/tests/scheduler/scheduler_test.go @@ -52,10 +52,7 @@ func TestCancel(t *testing.T) { f.StartServer() id := f.Run(`'sleep 1000'`) - err := f.Cancel(id) - if err != nil { - t.Error(err) - } + f.Cancel(id) task := f.Get(id) if task.State != tes.Canceled { t.Error("expected canceled state") diff --git a/tests/slurm/slurm_test.go b/tests/slurm/slurm_test.go index 4439de36d..68107910c 100644 --- a/tests/slurm/slurm_test.go +++ b/tests/slurm/slurm_test.go @@ -36,6 +36,7 @@ func TestMain(m *testing.M) { }() exit = m.Run() + return } func TestHelloWorld(t *testing.T) { diff --git a/tests/storage/amazon_s3_test.go b/tests/storage/amazon_s3_test.go index 0d451160b..4de5da0a0 100644 --- a/tests/storage/amazon_s3_test.go +++ b/tests/storage/amazon_s3_test.go @@ -40,10 +40,7 @@ func TestAmazonS3Storage(t *testing.T) { t.Fatal("error creating test bucket:", err) } defer func() { - err = client.deleteBucket(testBucket) - if err != nil { - t.Fatal("error deleting test bucket:", err) - } + client.deleteBucket(testBucket) }() protocol := "s3://" diff --git a/tests/storage/gs_test.go b/tests/storage/gs_test.go index 2571c30b4..fd1691259 100644 --- a/tests/storage/gs_test.go +++ b/tests/storage/gs_test.go @@ -50,10 +50,7 @@ func TestGoogleStorage(t *testing.T) { t.Fatal(err) } defer func() { - err = client.deleteBucket(testBucket) - if err != nil { - t.Fatal("error deleting test bucket:", err) - } + client.deleteBucket(testBucket) }() protocol := "gs://" diff --git a/tests/storage/multi_s3_test.go b/tests/storage/multi_s3_test.go index a56fbe355..69af08928 100644 --- a/tests/storage/multi_s3_test.go +++ b/tests/storage/multi_s3_test.go @@ -41,10 +41,7 @@ func TestMultiS3Storage(t *testing.T) { t.Fatal("error creating test bucket:", err) } defer func() { - err = gclient1.deleteBucket(testBucket) - if err != nil { - t.Fatal("error deleting test bucket:", err) - } + gclient1.deleteBucket(testBucket) }() gconf2 := conf.GenericS3[1] @@ -57,10 +54,7 @@ func TestMultiS3Storage(t *testing.T) { t.Fatal("error creating test bucket:", err) } defer func() { - err = gclient2.deleteBucket(testBucket) - if err != nil { - t.Fatal("error deleting test bucket:", err) - } + gclient2.deleteBucket(testBucket) }() // Stage input files diff --git a/tests/storage/storage_test.go b/tests/storage/storage_test.go index c23c1196f..54357f3b5 100644 --- a/tests/storage/storage_test.go +++ b/tests/storage/storage_test.go @@ -95,13 +95,13 @@ func TestBrokenSymlinkInput(t *testing.T) { } /* -Test the case where a container creates a symlink in an output path. -From the view of the host system where Funnel is running, this creates -a broken link, because the source of the symlink is a path relative -to the container filesystem. - -Funnel can fix some of these cases using volume definitions, which -is being tested here. + Test the case where a container creates a symlink in an output path. + From the view of the host system where Funnel is running, this creates + a broken link, because the source of the symlink is a path relative + to the container filesystem. + + Funnel can fix some of these cases using volume definitions, which + is being tested here. */ func TestSymlinkOutput(t *testing.T) { tests.SetLogOutput(log, t) @@ -156,10 +156,7 @@ func TestOverwriteOutput(t *testing.T) { func TestEmptyDir(t *testing.T) { tests.SetLogOutput(log, t) - err := os.Mkdir(path.Join(fun.StorageDir, "test_in"), 0777) - if err != nil { - t.Error(err) - } + os.Mkdir(path.Join(fun.StorageDir, "test_in"), 0777) id := fun.Run(` --sh 'echo hello' -I in={{ .storage }}/test_in diff --git a/tests/storage/swift_test.go b/tests/storage/swift_test.go index 86c3c8a91..dabe3eb01 100644 --- a/tests/storage/swift_test.go +++ b/tests/storage/swift_test.go @@ -37,10 +37,7 @@ func TestSwiftStorage(t *testing.T) { t.Fatal("error creating test bucket:", err) } defer func() { - err := client.deleteBucket(testBucket) - if err != nil { - t.Fatal("error deleting test bucket:", err) - } + client.deleteBucket(testBucket) }() protocol := "swift://" diff --git a/util/dockerutil/docker.go b/util/dockerutil/docker.go index f34de50d5..6f25c49d1 100644 --- a/util/dockerutil/docker.go +++ b/util/dockerutil/docker.go @@ -34,8 +34,7 @@ func NewDockerClient() (*client.Client, error) { } // Error message example: // Error getting metadata for container: Error response from daemon: client is newer than server (client API version: 1.26, server API version: 1.24) - // Hardcoding Docker version until version[1] returns valid version - os.Setenv("DOCKER_API_VERSION", "1.24") + os.Setenv("DOCKER_API_VERSION", version[1]) return NewDockerClient() } } diff --git a/util/fsutil/io.go b/util/fsutil/io.go index f331744b5..9bd01d158 100644 --- a/util/fsutil/io.go +++ b/util/fsutil/io.go @@ -18,10 +18,7 @@ func Reader(ctx context.Context, r io.Reader) io.Reader { SetReadDeadline(time.Time) error } if d, ok := r.(deadliner); ok { - err := d.SetReadDeadline(deadline) - if err != nil { - return nil - } + d.SetReadDeadline(deadline) } } return reader{ctx, r} @@ -55,10 +52,7 @@ func Writer(ctx context.Context, w io.Writer) io.Writer { SetWriteDeadline(time.Time) error } if d, ok := w.(deadliner); ok { - err := d.SetWriteDeadline(deadline) - if err != nil { - return nil - } + d.SetWriteDeadline(deadline) } } return writer{ctx, w} diff --git a/util/idle_timeout.go b/util/idle_timeout.go index 05edc95c0..cb3234b27 100644 --- a/util/idle_timeout.go +++ b/util/idle_timeout.go @@ -6,18 +6,18 @@ import "time" // Start() and Stop() are used to control the timer, and Done() is used to // detect when the timeout has been reached. // -// in := make(chan int) -// requestInput(in) -// t := IdleTimeoutAfter(time.Second * 10) -// for { -// select { -// case <-t.Done(): -// // ... code to respond to timeout -// case <-in: -// // Reset the timeout. -// t.Start() -// } -// } +// in := make(chan int) +// requestInput(in) +// t := IdleTimeoutAfter(time.Second * 10) +// for { +// select { +// case <-t.Done(): +// // ... code to respond to timeout +// case <-in: +// // Reset the timeout. +// t.Start() +// } +//} type IdleTimeout interface { Done() <-chan time.Time Start() @@ -51,12 +51,12 @@ type timerTimeout struct { // Done returns a channel which can be used to wait for the timeout. // -// t := IdleTimeoutAfter(time.Second * 10) -// for { -// select { -// case <-t.Done(): -// // ... code to respond to timeout -// } +// t := IdleTimeoutAfter(time.Second * 10) +// for { +// select { +// case <-t.Done(): +// // ... code to respond to timeout +// } // } func (t *timerTimeout) Done() <-chan time.Time { if t.timer != nil { diff --git a/util/retry_test.go b/util/retry_test.go index 9df0f5421..50fdbb535 100644 --- a/util/retry_test.go +++ b/util/retry_test.go @@ -20,13 +20,10 @@ func TestRetrier(t *testing.T) { bg := context.Background() i := 0 - err := r.Retry(bg, func() error { + r.Retry(bg, func() error { i++ return fmt.Errorf("always error") }) - if err == nil { - t.Error("Expected error") - } if i != 3 { t.Error("unexpected number of retries", i) } @@ -35,12 +32,9 @@ func TestRetrier(t *testing.T) { t.Error("unexpected next backoff", next) } - err = r.Retry(bg, func() error { + r.Retry(bg, func() error { return nil }) - if err != nil { - t.Error("unexpected error", err) - } next = r.backoff.NextBackOff() if next != time.Millisecond*10 { t.Error("unexpected next backoff", next) diff --git a/util/ring/ring_test.go b/util/ring/ring_test.go index fef1ea45d..225d9669b 100644 --- a/util/ring/ring_test.go +++ b/util/ring/ring_test.go @@ -55,10 +55,7 @@ func TestBuffer_Reset(t *testing.T) { func TestBuffer_ResetNewBytes(t *testing.T) { buf := NewBuffer(4) - _, err := buf.Write([]byte("12345")) - if err != nil { - t.Fatalf("err: %v", err) - } + buf.Write([]byte("12345")) if buf.NewBytesWritten() != 5 { t.Fatalf("expected new bytes written to be 5, got %d", buf.NewBytesWritten()) @@ -77,10 +74,7 @@ func TestBuffer_ResetNewBytes(t *testing.T) { t.Fatalf("expected content to be 2345, got %s", buf.String()) } - _, err = buf.Write([]byte("6789")) - if err != nil { - return - } + buf.Write([]byte("6789")) if buf.NewBytesWritten() != 4 { t.Fatalf("expected new bytes written to be 4, got %d", buf.NewBytesWritten()) } diff --git a/worker/docker.go b/worker/docker.go index d3b87a978..774004c79 100644 --- a/worker/docker.go +++ b/worker/docker.go @@ -30,22 +30,17 @@ type DockerCommand struct { // Run runs the Docker command and blocks until done. func (dcmd DockerCommand) Run(ctx context.Context) error { + fmt.Println("DEBUG: Running Docker") // Sync docker API version info. err := SyncDockerAPIVersion() if err != nil { - err := dcmd.Event.Error("failed to sync docker client API version", err) - if err != nil { - return err - } + dcmd.Event.Error("failed to sync docker client API version", err) } pullcmd := exec.Command("docker", "pull", dcmd.Image) err = pullcmd.Run() if err != nil { - err := dcmd.Event.Error("failed to pull docker image", err) - if err != nil { - return err - } + dcmd.Event.Error("failed to pull docker image", err) } args := []string{"run", "-i", "--read-only"} @@ -75,12 +70,11 @@ func (dcmd DockerCommand) Run(ctx context.Context) error { args = append(args, dcmd.Image) args = append(args, dcmd.Command...) + // dcmd.Event.Info("DEBUG docker args: ", args) // Roughly: `docker run --rm -i --read-only -w [workdir] -v [bindings] [imageName] [cmd]` - err = dcmd.Event.Info("Running command", "cmd", "docker "+strings.Join(args, " ")) - if err != nil { - return err - } + err = dcmd.Event.Info(fmt.Sprintf("args: %s", args)) + dcmd.Event.Info("Running command", "cmd", "docker "+strings.Join(args, " ")) cmd := exec.Command("docker", args...) if dcmd.Stdin != nil { @@ -94,19 +88,13 @@ func (dcmd DockerCommand) Run(ctx context.Context) error { } go dcmd.inspectContainer(ctx) out := cmd.Run() - err = dcmd.Event.Info("Command %s Complete exit=%s", strings.Join(args, " "), out) - if err != nil { - return err - } + dcmd.Event.Info("Command %s Complete exit=%s", strings.Join(args, " "), out) return out } // Stop stops the container. func (dcmd DockerCommand) Stop() error { - err := dcmd.Event.Info("Stopping container", "container", dcmd.ContainerName) - if err != nil { - return err - } + dcmd.Event.Info("Stopping container", "container", dcmd.ContainerName) // cmd := exec.Command("docker", "stop", dcmd.ContainerName) cmd := exec.Command("docker", "rm", "-f", dcmd.ContainerName) //switching to this to be a bit more forceful return cmd.Run() @@ -147,13 +135,10 @@ func (dcmd *DockerCommand) inspectContainer(ctx context.Context) { meta := []metadata{} err := json.Unmarshal(out, &meta) if err == nil && len(meta) == 1 { - err := dcmd.Event.Info("container metadata", + dcmd.Event.Info("container metadata", "containerID", meta[0].ID, "containerName", meta[0].Name, "containerImageHash", meta[0].Image) - if err != nil { - return - } return } } diff --git a/worker/file_mapper.go b/worker/file_mapper.go index 4219c724d..e16994648 100644 --- a/worker/file_mapper.go +++ b/worker/file_mapper.go @@ -151,7 +151,7 @@ func (mapper *FileMapper) HostPath(src string) (string, error) { // OpenHostFile opens a file on the host file system at a mapped path. // "src" is an unmapped path. This function will handle mapping the path. // -// # This function calls os.Open +// This function calls os.Open // // If the path can't be mapped or the file can't be opened, an error is returned. func (mapper *FileMapper) OpenHostFile(src string) (*os.File, error) { @@ -169,7 +169,7 @@ func (mapper *FileMapper) OpenHostFile(src string) (*os.File, error) { // CreateHostFile creates a file on the host file system at a mapped path. // "src" is an unmapped path. This function will handle mapping the path. // -// # This function calls os.Create +// This function calls os.Create // // If the path can't be mapped or the file can't be created, an error is returned. func (mapper *FileMapper) CreateHostFile(src string) (*os.File, error) { @@ -198,6 +198,8 @@ func (mapper *FileMapper) AddTmpVolume(mountPoint string) error { return err } + fmt.Println("DEBUG: mountPoint:", mountPoint) + fmt.Println("DEBUG: hostPath:", hostPath) err = fsutil.EnsureDir(hostPath) if err != nil { return err @@ -302,5 +304,6 @@ func (mapper *FileMapper) ContainerPath(src string) string { // Cleanup deletes the working directory. func (mapper *FileMapper) Cleanup() error { - return os.RemoveAll(mapper.WorkDir) + return nil + // return os.RemoveAll(mapper.WorkDir) } diff --git a/worker/step.go b/worker/step.go index 2a6d066ec..4f28c797f 100644 --- a/worker/step.go +++ b/worker/step.go @@ -21,11 +21,7 @@ func (s *stepWorker) Run(ctx context.Context) error { // WaitGroup to block closing the worker until the last event is written from events/executor.go:StreamLogTail() var wg sync.WaitGroup defer wg.Wait() - - err := s.Event.StartTime(time.Now()) - if err != nil { - return err - } + s.Event.StartTime(time.Now()) // subctx helps ensure that these goroutines are cleaned up, // even when the task is canceled. @@ -63,14 +59,14 @@ func (s *stepWorker) Run(ctx context.Context) error { select { case <-ctx.Done(): // Likely the task was canceled. - _ = s.Command.Stop() + s.Command.Stop() <-done - _ = s.Event.EndTime(time.Now()) + s.Event.EndTime(time.Now()) return ctx.Err() case result := <-done: - _ = s.Event.EndTime(time.Now()) - _ = s.Event.ExitCode(getExitCode(result)) + s.Event.EndTime(time.Now()) + s.Event.ExitCode(getExitCode(result)) return result } } diff --git a/worker/storage.go b/worker/storage.go index e84082889..0edba8d34 100644 --- a/worker/storage.go +++ b/worker/storage.go @@ -32,10 +32,7 @@ func FlattenInputs(ctx context.Context, inputs []*tes.Input, store storage.Stora } if len(list) == 0 { - err := ev.Warn("download source directory is empty", "url", input.Url) - if err != nil { - return nil, fmt.Errorf("writing event: %s", err) - } + ev.Warn("download source directory is empty", "url", input.Url) continue } @@ -100,10 +97,7 @@ func FlattenOutputs(ctx context.Context, outputs []*tes.Output, store storage.St } if len(list) == 0 { - err := ev.Warn("upload source directory is empty", "url", output.Url) - if err != nil { - return nil, fmt.Errorf("writing event: %s", err) - } + ev.Warn("upload source directory is empty", "url", output.Url) continue } @@ -167,22 +161,13 @@ func (d *download) Path() string { return d.in.Path } func (d *download) Started() { - err := d.ev.Info("download started", "url", d.in.Url) - if err != nil { - return - } + d.ev.Info("download started", "url", d.in.Url) } func (d *download) Finished(obj *storage.Object) { - err := d.ev.Info("download finished", "url", d.in.Url, "size", obj.Size, "etag", obj.ETag) - if err != nil { - d.err = err - } + d.ev.Info("download finished", "url", d.in.Url, "size", obj.Size, "etag", obj.ETag) } func (d *download) Failed(err error) { - e := d.ev.Error("download failed", "url", d.in.Url, "error", err) - if e != nil { - return - } + d.ev.Error("download failed", "url", d.in.Url, "error", err) d.cancel() d.err = err } @@ -201,10 +186,7 @@ func (u *upload) Path() string { return u.out.Path } func (u *upload) Started() { - err := u.ev.Info("upload started", "url", u.out.Url) - if err != nil { - return - } + u.ev.Info("upload started", "url", u.out.Url) } func (u *upload) Finished(obj *storage.Object) { u.log = &tes.OutputFileLog{ @@ -212,23 +194,17 @@ func (u *upload) Finished(obj *storage.Object) { Path: u.out.Path, SizeBytes: fmt.Sprintf("%d", obj.Size), } - err := u.ev.Info("upload finished", "url", obj.URL, "etag", obj.ETag, "size", obj.Size) - if err != nil { - return - } + u.ev.Info("upload finished", "url", obj.URL, "etag", obj.ETag, "size", obj.Size) } func (u *upload) Failed(err error) { u.err = err - e := u.ev.Error("upload failed", "url", u.out.Url, "error", err) - if e != nil { - return - } + u.ev.Error("upload failed", "url", u.out.Url, "error", err) } // fixLinks walks the output paths, fixing cases where a symlink is // broken because it's pointing to a path inside a container volume. func fixLinks(mapper *FileMapper, basepath string) { - e := filepath.Walk(basepath, func(p string, f os.FileInfo, err error) error { + filepath.Walk(basepath, func(p string, f os.FileInfo, err error) error { if err != nil { // There's an error, so be safe and give up on this file return nil @@ -263,16 +239,10 @@ func fixLinks(mapper *FileMapper, basepath string) { if err != nil { return nil } - err = os.Symlink(mapped, p) - if err != nil { - return nil - } + os.Symlink(mapped, p) } } } return nil }) - if e != nil { - return - } } diff --git a/worker/taskreader_test.go b/worker/taskreader_test.go index 6821a076c..994d13c28 100644 --- a/worker/taskreader_test.go +++ b/worker/taskreader_test.go @@ -12,7 +12,7 @@ func TestFileTaskReader(t *testing.T) { } ctx := context.Background() - task, _ := r.Task(ctx) + task, err := r.Task(ctx) if task.Name != "Hello world" { t.Error("unexpected task content") } @@ -29,7 +29,7 @@ func TestBase64TaskReader(t *testing.T) { } ctx := context.Background() - task, _ := r.Task(ctx) + task, err := r.Task(ctx) if task.Name != "Hello world" { t.Error("unexpected task content") } diff --git a/worker/worker.go b/worker/worker.go index 4e2126298..eb518205b 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -4,7 +4,6 @@ package worker import ( "context" "fmt" - "log" "os" "path/filepath" "time" @@ -49,13 +48,10 @@ func (r *DefaultWorker) Run(pctx context.Context) (runerr error) { // TODO if we failed to retrieve the task, we can't do anything useful. // but, it's also difficult to report the failure usefully. if run.syserr != nil { - err := r.EventWriter.WriteEvent(pctx, events.NewSystemLog("unknown", 0, 0, "error", + r.EventWriter.WriteEvent(pctx, events.NewSystemLog("unknown", 0, 0, "error", "failed to get task. ID unknown", map[string]string{ "error": run.syserr.Error(), })) - if err != nil { - return err - } runerr = run.syserr return } @@ -64,24 +60,12 @@ func (r *DefaultWorker) Run(pctx context.Context) (runerr error) { event = events.NewTaskWriter(task.GetId(), 0, r.EventWriter) mapper = NewFileMapper(filepath.Join(r.Conf.WorkDir, task.GetId())) - err := event.Info("Version", version.LogFields()...) - if err != nil { - return err - } - err = event.State(tes.State_INITIALIZING) - if err != nil { - return err - } - err = event.StartTime(time.Now()) - if err != nil { - return err - } + event.Info("Version", version.LogFields()...) + event.State(tes.State_INITIALIZING) + event.StartTime(time.Now()) if name, err := os.Hostname(); err == nil { - err := event.Metadata(map[string]string{"hostname": name}) - if err != nil { - return err - } + event.Metadata(map[string]string{"hostname": name}) } // TODO: Increment taskgroup waitgroup, defer done on waitgroup @@ -91,58 +75,33 @@ func (r *DefaultWorker) Run(pctx context.Context) (runerr error) { // Run the final logging/state steps in a deferred function // to ensure they always run, even if there's a missed error. defer func() { - err := event.EndTime(time.Now()) - if err != nil { - log.Printf("failed to set end time: %v", err) - } - + event.EndTime(time.Now()) + fmt.Println("DEBUG run:", run) switch { case run.taskCanceled: // The task was canceled. - err := event.Info("Canceled") - if err != nil { - log.Print(err) - } - err = event.State(tes.State_CANCELED) - if err != nil { - log.Print(err) - } + event.Info("Canceled") + event.State(tes.State_CANCELED) runerr = fmt.Errorf("task canceled") case run.syserr != nil: // Something else failed - err := event.Error("System error", "error", run.syserr) - if err != nil { - log.Print(err) - } - err = event.State(tes.State_SYSTEM_ERROR) - if err != nil { - log.Print(err) - } + fmt.Println("DEBUG syserr:", run.syserr) + event.Error("System error", "error", run.syserr) + event.State(tes.State_SYSTEM_ERROR) runerr = run.syserr case run.execerr != nil: // One of the executors failed - err := event.Error("Exec error", "error", run.execerr) - if err != nil { - log.Print(err) - } - err = event.State(tes.State_EXECUTOR_ERROR) - if err != nil { - log.Print(err) - } + fmt.Println("DEBUG execerr:", run.execerr) + event.Error("Exec error", "error", run.execerr) + event.State(tes.State_EXECUTOR_ERROR) runerr = run.execerr default: - err := event.State(tes.State_COMPLETE) - if err != nil { - log.Print(err) - } + event.State(tes.State_COMPLETE) } // cleanup workdir if !r.Conf.LeaveWorkDir { - err := mapper.Cleanup() - if err != nil { - panic(err) - } + mapper.Cleanup() } }() @@ -170,10 +129,7 @@ func (r *DefaultWorker) Run(pctx context.Context) (runerr error) { } if run.ok() { - err := event.State(tes.State_RUNNING) - if err != nil { - return err - } + event.State(tes.State_RUNNING) } // Run steps @@ -228,10 +184,7 @@ func (r *DefaultWorker) Run(pctx context.Context) (runerr error) { } if len(outputLog) > 0 { - err := event.Outputs(outputLog) - if err != nil { - return err - } + event.Outputs(outputLog) } return @@ -250,7 +203,7 @@ func (r *DefaultWorker) openStepLogs(mapper *FileMapper, s *stepWorker, d *tes.E if d.Stdin != "" { s.Command.Stdin, err = mapper.OpenHostFile(d.Stdin) if err != nil { - _ = s.Event.Error("Couldn't prepare log files", err) + s.Event.Error("Couldn't prepare log files", err) return err } } @@ -259,7 +212,7 @@ func (r *DefaultWorker) openStepLogs(mapper *FileMapper, s *stepWorker, d *tes.E if d.Stdout != "" { s.Command.Stdout, err = mapper.CreateHostFile(d.Stdout) if err != nil { - _ = s.Event.Error("Couldn't prepare log files", err) + s.Event.Error("Couldn't prepare log files", err) return err } }