Skip to content

Commit

Permalink
config: use time.ParseDuration for duration strings
Browse files Browse the repository at this point in the history
  • Loading branch information
Alex Buchanan committed Apr 9, 2018
1 parent 59a452f commit 76f0d08
Show file tree
Hide file tree
Showing 20 changed files with 127 additions and 90 deletions.
2 changes: 1 addition & 1 deletion cmd/aws/batch/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func DefaultConfig() Config {
c.Funnel.DynamoDB.TableBasename = "funnel"
c.Funnel.DynamoDB.Region = ""
c.Funnel.Worker.WorkDir = "/opt/funnel-work-dir"
c.Funnel.Worker.LogUpdateRate = time.Minute * 5
c.Funnel.Worker.LogUpdateRate = config.Duration(time.Minute * 5)
c.Funnel.Worker.LogTailSize = 10000

return c
Expand Down
22 changes: 11 additions & 11 deletions cmd/util/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func serverFlags(flagConf *config.Config) *pflag.FlagSet {
f.StringVar(&flagConf.Server.HTTPPort, "Server.HTTPPort", flagConf.Server.HTTPPort, "HTTP Port")
f.StringVar(&flagConf.Server.RPCPort, "Server.RPCPort", flagConf.Server.RPCPort, "RPC Port")
f.StringVar(&flagConf.Server.ServiceName, "Server.ServiceName", flagConf.Server.ServiceName, "Host name or IP")
f.DurationVar(&flagConf.Server.RPCClientTimeout, "Server.RPCClientTimeout", flagConf.Server.RPCClientTimeout, "Request timeout for RPC client connections")
f.Var(flagConf.Server.RPCClientTimeout, "Server.RPCClientTimeout", "Request timeout for RPC client connections")
f.UintVar(&flagConf.Server.RPCClientMaxRetries, "Server.RPCClientMaxRetries", flagConf.Server.RPCClientMaxRetries, "Maximum number of times that a request will be retried for failures")

return f
Expand All @@ -86,8 +86,8 @@ func workerFlags(flagConf *config.Config) *pflag.FlagSet {
f := pflag.NewFlagSet("", pflag.ContinueOnError)

f.Int64Var(&flagConf.Worker.LogTailSize, "Worker.LogTailSize", flagConf.Worker.LogTailSize, "Max bytes to store for stdout/stderr")
f.DurationVar(&flagConf.Worker.LogUpdateRate, "Worker.LogUpdateRate", flagConf.Worker.LogUpdateRate, "How often to send stdout/stderr log updates")
f.DurationVar(&flagConf.Worker.PollingRate, "Worker.PollingRate", flagConf.Worker.PollingRate, "How often to poll for cancel signals")
f.Var(flagConf.Worker.LogUpdateRate, "Worker.LogUpdateRate", "How often to send stdout/stderr log updates")
f.Var(flagConf.Worker.PollingRate, "Worker.PollingRate", "How often to poll for cancel signals")
f.StringVar(&flagConf.Worker.WorkDir, "Worker.WorkDir", flagConf.Worker.WorkDir, "Working directory")
f.BoolVar(&flagConf.Worker.LeaveWorkDir, "Worker.LeaveWorkDir", flagConf.Worker.LeaveWorkDir, "Leave working directory after execution")

Expand All @@ -101,8 +101,8 @@ func nodeFlags(flagConf *config.Config) *pflag.FlagSet {
f.Uint32Var(&flagConf.Node.Resources.Cpus, "Node.Resources.Cpus", flagConf.Node.Resources.Cpus, "Cpus available to Node")
f.Float64Var(&flagConf.Node.Resources.RamGb, "Node.Resources.RamGb", flagConf.Node.Resources.RamGb, "Ram (GB) available to Node")
f.Float64Var(&flagConf.Node.Resources.DiskGb, "Node.Resources.DiskGb", flagConf.Node.Resources.DiskGb, "Free disk (GB) available to Node")
f.DurationVar(&flagConf.Node.Timeout, "Node.Timeout", flagConf.Node.Timeout, "Node timeout in seconds")
f.DurationVar(&flagConf.Node.UpdateRate, "Node.UpdateRate", flagConf.Node.UpdateRate, "Node update rate")
f.Var(flagConf.Node.Timeout, "Node.Timeout", "Node timeout in seconds")
f.Var(flagConf.Node.UpdateRate, "Node.UpdateRate", "Node update rate")
// TODO Metadata

return f
Expand Down Expand Up @@ -143,7 +143,7 @@ func dbFlags(flagConf *config.Config) *pflag.FlagSet {
// mongodb
f.StringSliceVar(&flagConf.MongoDB.Addrs, "MongoDB.Addrs", flagConf.MongoDB.Addrs, "Address of a MongoDB seed server. This flag can be used multiple times")
f.StringVar(&flagConf.MongoDB.Database, "MongoDB.Database", flagConf.MongoDB.Database, "Database name in MongoDB")
f.DurationVar(&flagConf.MongoDB.Timeout, "MongoDB.Timeout", flagConf.MongoDB.Timeout, "Timeout in seconds for initial connection and follow up operations")
f.Var(flagConf.MongoDB.Timeout, "MongoDB.Timeout", "Timeout in seconds for initial connection and follow up operations")

return f
}
Expand All @@ -169,7 +169,7 @@ func storageFlags(flagConf *config.Config) *pflag.FlagSet {

// HTTP storage
f.BoolVar(&flagConf.HTTPStorage.Disabled, "HTTPStorage.Disabled", flagConf.HTTPStorage.Disabled, "Disable storage backend")
f.DurationVar(&flagConf.HTTPStorage.Timeout, "HTTPStorage.Timeout", flagConf.HTTPStorage.Timeout, "Timeout in seconds for request")
f.Var(flagConf.HTTPStorage.Timeout, "HTTPStorage.Timeout", "Timeout in seconds for request")

return f
}
Expand All @@ -193,11 +193,11 @@ func computeFlags(flagConf *config.Config) *pflag.FlagSet {
f.StringVar(&flagConf.PBS.Template, "PBS.Template", flagConf.PBS.Template, "Path to submit template file")

// Scheduler
f.DurationVar(&flagConf.Scheduler.ScheduleRate, "Scheduler.ScheduleRate", flagConf.Scheduler.ScheduleRate, "How often to run a scheduler iteration")
f.Var(flagConf.Scheduler.ScheduleRate, "Scheduler.ScheduleRate", "How often to run a scheduler iteration")
f.IntVar(&flagConf.Scheduler.ScheduleChunk, "Scheduler.ScheduleChunk", flagConf.Scheduler.ScheduleChunk, "How many tasks to schedule in one iteration")
f.DurationVar(&flagConf.Scheduler.NodePingTimeout, "Scheduler.NodePingTimeout", flagConf.Scheduler.NodePingTimeout, "How long to wait for a node ping before marking it as dead")
f.DurationVar(&flagConf.Scheduler.NodeInitTimeout, "Scheduler.NodeInitTimeout", flagConf.Scheduler.NodeInitTimeout, "How long to wait for node initialization before marking it dead")
f.DurationVar(&flagConf.Scheduler.NodeDeadTimeout, "Scheduler.NodeDeadTimeout", flagConf.Scheduler.NodeDeadTimeout, "How long to wait before deleting a dead node from the DB")
f.Var(flagConf.Scheduler.NodePingTimeout, "Scheduler.NodePingTimeout", "How long to wait for a node ping before marking it as dead")
f.Var(flagConf.Scheduler.NodeInitTimeout, "Scheduler.NodeInitTimeout", "How long to wait for node initialization before marking it dead")
f.Var(flagConf.Scheduler.NodeDeadTimeout, "Scheduler.NodeDeadTimeout", "How long to wait before deleting a dead node from the DB")

// Slurm
f.StringVar(&flagConf.Slurm.Template, "Slurm.Template", flagConf.Slurm.Template, "Path to submit template file")
Expand Down
2 changes: 1 addition & 1 deletion compute/batch/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func (b *Backend) Cancel(ctx context.Context, taskID string) error {
// In this context a "FAILED" state is being used as a generic term that captures
// one or more terminal states for the backend.
func (b *Backend) reconcile(ctx context.Context) {
ticker := time.NewTicker(b.conf.ReconcileRate)
ticker := time.NewTicker(time.Duration(b.conf.ReconcileRate))

for {
select {
Expand Down
3 changes: 2 additions & 1 deletion compute/htcondor/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os/exec"
"regexp"
"strings"
"time"

"github.com/ohsu-comp-bio/funnel/compute"
"github.com/ohsu-comp-bio/funnel/config"
Expand All @@ -27,7 +28,7 @@ func NewBackend(ctx context.Context, conf config.Config, reader tes.ReadOnlyServ
Database: reader,
ExtractID: extractID,
MapStates: mapStates,
ReconcileRate: conf.HTCondor.ReconcileRate,
ReconcileRate: time.Duration(conf.HTCondor.ReconcileRate),
}

if !conf.HTCondor.DisableReconciler {
Expand Down
3 changes: 2 additions & 1 deletion compute/pbs/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/xml"
"fmt"
"os/exec"
"time"

"github.com/ohsu-comp-bio/funnel/compute"
"github.com/ohsu-comp-bio/funnel/config"
Expand All @@ -25,7 +26,7 @@ func NewBackend(ctx context.Context, conf config.Config, reader tes.ReadOnlyServ
Database: reader,
ExtractID: extractID,
MapStates: mapStates,
ReconcileRate: conf.PBS.ReconcileRate,
ReconcileRate: time.Duration(conf.PBS.ReconcileRate),
}

if !conf.PBS.DisableReconciler {
Expand Down
6 changes: 3 additions & 3 deletions compute/scheduler/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func NewNodeProcess(ctx context.Context, conf config.Config, factory Worker, log
log.Error("error detecting resources", "error", derr)
}

timeout := util.NewIdleTimeout(conf.Node.Timeout)
timeout := util.NewIdleTimeout(time.Duration(conf.Node.Timeout))
state := NodeState_UNINITIALIZED

return &NodeProcess{
Expand Down Expand Up @@ -71,7 +71,7 @@ func (n *NodeProcess) Run(ctx context.Context) {
n.checkConnection(ctx)
n.sync(ctx)

ticker := time.NewTicker(n.conf.Node.UpdateRate)
ticker := time.NewTicker(time.Duration(n.conf.Node.UpdateRate))
defer ticker.Stop()

for {
Expand Down Expand Up @@ -182,7 +182,7 @@ func (n *NodeProcess) runTask(ctx context.Context, id string) {
// task cannot fully complete until it has successfully removed the
// assigned ID from the node database. this helps prevent tasks from
// running multiple times.
ticker := time.NewTicker(n.conf.Node.UpdateRate)
ticker := time.NewTicker(time.Duration(n.conf.Node.UpdateRate))
defer ticker.Stop()
for {
select {
Expand Down
6 changes: 3 additions & 3 deletions compute/scheduler/node_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,16 +118,16 @@ func UpdateNodeState(nodes []*Node, conf config.Scheduler) []*Node {
if node.State == NodeState_UNINITIALIZED || node.State == NodeState_INITIALIZING {

// The node is initializing, which has a more liberal timeout.
if d > conf.NodeInitTimeout {
if d > time.Duration(conf.NodeInitTimeout) {
// Looks like the node failed to initialize. Mark it dead
node.State = NodeState_DEAD
}

} else if node.State == NodeState_DEAD && d > conf.NodeDeadTimeout {
} else if node.State == NodeState_DEAD && d > time.Duration(conf.NodeDeadTimeout) {
// The node has been dead for long enough.
node.State = NodeState_GONE

} else if d > conf.NodePingTimeout {
} else if d > time.Duration(conf.NodePingTimeout) {
// The node hasn't pinged in awhile, mark it dead.
node.State = NodeState_DEAD
}
Expand Down
2 changes: 1 addition & 1 deletion compute/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type Scheduler struct {
// request the the configured backend schedule them, and
// act on offers made by the backend.
func (s *Scheduler) Run(ctx context.Context) error {
ticker := time.NewTicker(s.Conf.ScheduleRate)
ticker := time.NewTicker(time.Duration(s.Conf.ScheduleRate))
for {
select {
case <-ctx.Done():
Expand Down
3 changes: 2 additions & 1 deletion compute/slurm/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"os/exec"
"regexp"
"strings"
"time"

"github.com/ohsu-comp-bio/funnel/compute"
"github.com/ohsu-comp-bio/funnel/config"
Expand All @@ -28,7 +29,7 @@ func NewBackend(ctx context.Context, conf config.Config, reader tes.ReadOnlyServ
Database: reader,
ExtractID: extractID,
MapStates: mapStates,
ReconcileRate: conf.Slurm.ReconcileRate,
ReconcileRate: time.Duration(conf.Slurm.ReconcileRate),
}

if !conf.Slurm.DisableReconciler {
Expand Down
27 changes: 13 additions & 14 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package config

import (
"os"
"time"

"github.com/ohsu-comp-bio/funnel/logger"
)
Expand Down Expand Up @@ -57,7 +56,7 @@ type Server struct {
// The timeout to use for making RPC client connections in nanoseconds
// This timeout is Only enforced when used in conjunction with the
// grpc.WithBlock dial option.
RPCClientTimeout time.Duration
RPCClientTimeout Duration
// The maximum number of times that a request will be retried for failures.
// Time between retries follows an exponential backoff starting at 5 seconds
// up to 1 minute
Expand Down Expand Up @@ -88,15 +87,15 @@ func (c *Server) RPCAddress() string {
// Scheduler contains funnel's basic scheduler configuration.
type Scheduler struct {
// How often to run a scheduler iteration.
ScheduleRate time.Duration
ScheduleRate Duration
// How many tasks to schedule in one iteration.
ScheduleChunk int
// How long to wait for a node ping before marking it as dead
NodePingTimeout time.Duration
NodePingTimeout Duration
// How long to wait for node initialization before marking it dead
NodeInitTimeout time.Duration
NodeInitTimeout Duration
// How long to wait before deleting a dead node from the DB.
NodeDeadTimeout time.Duration
NodeDeadTimeout Duration
}

// Node contains the configuration for a node. Nodes track available resources
Expand All @@ -112,9 +111,9 @@ type Node struct {
}
// If the node has been idle for longer than the timeout, it will shut down.
// -1 means there is no timeout. 0 means timeout immediately after the first task.
Timeout time.Duration
Timeout Duration
// How often the node sends update requests to the server.
UpdateRate time.Duration
UpdateRate Duration
Metadata map[string]string
}

Expand All @@ -123,11 +122,11 @@ type Worker struct {
// Directory to write task files to
WorkDir string
// How often the worker should poll for cancel signals
PollingRate time.Duration
PollingRate Duration
// How often to update stdout/stderr log fields.
// Setting this to 0 will result in these fields being updated a single time
// after the executor exits.
LogUpdateRate time.Duration
LogUpdateRate Duration
// Max bytes of stdout/stderr to store in the database.
// Setting this to 0 turns off stdout/stderr logging.
LogTailSize int64
Expand All @@ -145,7 +144,7 @@ type HPCBackend struct {
DisableReconciler bool
// ReconcileRate is how often the compute backend compares states in Funnel's backend
// to those reported by the backend
ReconcileRate time.Duration
ReconcileRate Duration
Template string
}

Expand All @@ -164,7 +163,7 @@ type MongoDB struct {
// first connecting and on follow up operations in the session. If
// timeout is zero, the call may block forever waiting for a connection
// to be established.
Timeout time.Duration
Timeout Duration
// Username and Password inform the credentials for the initial authentication
// done on the database defined by the Database field.
Username string
Expand Down Expand Up @@ -222,7 +221,7 @@ type AWSBatch struct {
DisableReconciler bool
// ReconcileRate is how often the compute backend compares states in Funnel's backend
// to those reported by AWS Batch
ReconcileRate time.Duration
ReconcileRate Duration
AWSConfig
}

Expand Down Expand Up @@ -326,7 +325,7 @@ func (s SwiftStorage) Valid() bool {
type HTTPStorage struct {
Disabled bool
// Timeout duration for http GET calls
Timeout time.Duration
Timeout Duration
}

// Valid validates the HTTPStorage configuration.
Expand Down
Loading

0 comments on commit 76f0d08

Please sign in to comment.