Skip to content

Commit

Permalink
reverseproxy: Minor fixes and cleanup
Browse files Browse the repository at this point in the history
Now use context cancellation to stop active health checker, which is
simpler than and just as effective as using a separate stop channel.
  • Loading branch information
mholt committed Aug 8, 2020
1 parent 65a0952 commit e2f913b
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 45 deletions.
11 changes: 4 additions & 7 deletions modules/caddyhttp/reverseproxy/healthchecks.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ type ActiveHealthChecks struct {
// body of a healthy backend.
ExpectBody string `json:"expect_body,omitempty"`

stopChan chan struct{}
httpClient *http.Client
bodyRegexp *regexp.Regexp
logger *zap.Logger
Expand Down Expand Up @@ -137,8 +136,7 @@ func (h *Handler) activeHealthChecker() {
select {
case <-ticker.C:
h.doActiveHealthCheckForAllHosts()
case <-h.HealthChecks.Active.stopChan:
// TODO: consider using a Context for cancellation instead
case <-h.ctx.Done():
ticker.Stop()
return
}
Expand Down Expand Up @@ -341,8 +339,8 @@ func (h *Handler) countFailure(upstream *Upstream) {
if err != nil {
h.HealthChecks.Passive.logger.Error("could not count failure",
zap.String("host", upstream.Dial),
zap.Error(err),
)
zap.Error(err))
return
}

// forget it later
Expand All @@ -357,8 +355,7 @@ func (h *Handler) countFailure(upstream *Upstream) {
if err != nil {
h.HealthChecks.Passive.logger.Error("could not forget failure",
zap.String("host", upstream.Dial),
zap.Error(err),
)
zap.Error(err))
}
}(upstream.Host, failDuration)
}
2 changes: 1 addition & 1 deletion modules/caddyhttp/reverseproxy/hosts.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func (u *Upstream) fillDialInfo(r *http.Request) (DialInfo, error) {
// of the state of a remote host. It implements the
// Host interface.
type upstreamHost struct {
numRequests int64 // must be first field to be 64-bit aligned on 32-bit systems (see https://golang.org/pkg/sync/atomic/#pkg-note-BUG)
numRequests int64 // must be 64-bit aligned on 32-bit systems (see https://golang.org/pkg/sync/atomic/#pkg-note-BUG)
fails int64
unhealthy int32
}
Expand Down
79 changes: 42 additions & 37 deletions modules/caddyhttp/reverseproxy/reverseproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ type Handler struct {
Transport http.RoundTripper `json:"-"`
CB CircuitBreaker `json:"-"`

ctx caddy.Context
logger *zap.Logger
}

Expand All @@ -125,6 +126,7 @@ func (Handler) CaddyModule() caddy.ModuleInfo {

// Provision ensures that h is set up properly before use.
func (h *Handler) Provision(ctx caddy.Context) error {
h.ctx = ctx
h.logger = ctx.Logger(h)

// start by loading modules
Expand Down Expand Up @@ -235,36 +237,43 @@ func (h *Handler) Provision(ctx caddy.Context) error {
}
}

// if active health checks are enabled, configure them and start a worker
if h.HealthChecks != nil &&
h.HealthChecks.Active != nil &&
(h.HealthChecks.Active.Path != "" || h.HealthChecks.Active.Port != 0) {
h.HealthChecks.Active.logger = h.logger.Named("health_checker.active")

timeout := time.Duration(h.HealthChecks.Active.Timeout)
if timeout == 0 {
timeout = 5 * time.Second
if h.HealthChecks != nil {
// set defaults on passive health checks, if necessary
if h.HealthChecks.Passive != nil {
if h.HealthChecks.Passive.FailDuration > 0 && h.HealthChecks.Passive.MaxFails == 0 {
h.HealthChecks.Passive.MaxFails = 1
}
}

h.HealthChecks.Active.stopChan = make(chan struct{})
h.HealthChecks.Active.httpClient = &http.Client{
Timeout: timeout,
Transport: h.Transport,
}
// if active health checks are enabled, configure them and start a worker
if h.HealthChecks.Active != nil &&
(h.HealthChecks.Active.Path != "" || h.HealthChecks.Active.Port != 0) {
h.HealthChecks.Active.logger = h.logger.Named("health_checker.active")

if h.HealthChecks.Active.Interval == 0 {
h.HealthChecks.Active.Interval = caddy.Duration(30 * time.Second)
}
timeout := time.Duration(h.HealthChecks.Active.Timeout)
if timeout == 0 {
timeout = 5 * time.Second
}

if h.HealthChecks.Active.ExpectBody != "" {
var err error
h.HealthChecks.Active.bodyRegexp, err = regexp.Compile(h.HealthChecks.Active.ExpectBody)
if err != nil {
return fmt.Errorf("expect_body: compiling regular expression: %v", err)
h.HealthChecks.Active.httpClient = &http.Client{
Timeout: timeout,
Transport: h.Transport,
}

if h.HealthChecks.Active.Interval == 0 {
h.HealthChecks.Active.Interval = caddy.Duration(30 * time.Second)
}

if h.HealthChecks.Active.ExpectBody != "" {
var err error
h.HealthChecks.Active.bodyRegexp, err = regexp.Compile(h.HealthChecks.Active.ExpectBody)
if err != nil {
return fmt.Errorf("expect_body: compiling regular expression: %v", err)
}
}
}

go h.activeHealthChecker()
go h.activeHealthChecker()
}
}

// set up any response routes
Expand All @@ -280,14 +289,6 @@ func (h *Handler) Provision(ctx caddy.Context) error {

// Cleanup cleans up the resources made by h during provisioning.
func (h *Handler) Cleanup() error {
// stop the active health checker
if h.HealthChecks != nil &&
h.HealthChecks.Active != nil &&
h.HealthChecks.Active.stopChan != nil {
// TODO: consider using context cancellation, could be much simpler
close(h.HealthChecks.Active.stopChan)
}

// TODO: Close keepalive connections on reload? https://github.com/caddyserver/caddy/pull/2507/files#diff-70219fd88fe3f36834f474ce6537ed26R762

// remove hosts from our config from the pool
Expand Down Expand Up @@ -351,7 +352,7 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request, next caddyht
if proxyErr == nil {
proxyErr = fmt.Errorf("no upstreams available")
}
if !h.LoadBalancing.tryAgain(start, proxyErr, r) {
if !h.LoadBalancing.tryAgain(h.ctx, start, proxyErr, r) {
break
}
continue
Expand Down Expand Up @@ -410,7 +411,7 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request, next caddyht
h.countFailure(upstream)

// if we've tried long enough, break
if !h.LoadBalancing.tryAgain(start, proxyErr, r) {
if !h.LoadBalancing.tryAgain(h.ctx, start, proxyErr, r) {
break
}
}
Expand Down Expand Up @@ -661,7 +662,7 @@ func (h *Handler) reverseProxy(rw http.ResponseWriter, req *http.Request, di Dia
// long enough before the next retry (i.e. no more sleeping is
// needed). If false is returned, the handler should stop trying to
// proxy the request.
func (lb LoadBalancing) tryAgain(start time.Time, proxyErr error, req *http.Request) bool {
func (lb LoadBalancing) tryAgain(ctx caddy.Context, start time.Time, proxyErr error, req *http.Request) bool {
// if we've tried long enough, break
if time.Since(start) >= time.Duration(lb.TryDuration) {
return false
Expand All @@ -687,8 +688,12 @@ func (lb LoadBalancing) tryAgain(start time.Time, proxyErr error, req *http.Requ
}

// otherwise, wait and try the next available host
time.Sleep(time.Duration(lb.TryInterval))
return true
select {
case <-time.After(time.Duration(lb.TryInterval)):
return true
case <-ctx.Done():
return false
}
}

// directRequest modifies only req.URL so that it points to the upstream
Expand Down

0 comments on commit e2f913b

Please sign in to comment.