Skip to content

Commit

Permalink
Add timeout and failureThreshold to multicluster probe (#13061)
Browse files Browse the repository at this point in the history
* Add timeout and failureThreshold to multicluster probe

- This adds the `probeSpec.failureThreshold` and `probeSpec.timeout` fields to the Link CRD spec.
- Likewise, the `gateway.probe.failureThreshold` and `gateway.probe.timeout` fields are added to the linkerd-multicluster chart, that are used to populate the new `mirror.linkerd.io/probe-failure-threshold` and `mirror.linkerd.io/probe-timeout` annotations in the gateway service (consumed by `linkerd mc link` to populate probe spec).
- In the probe worker, we replace the hard-coded 50s timeout with the new timeout config (which now defaults to 30s). And the probe loop got refactored in order to not mark the gateway as unhealty until the consecutive failures threshold is reached.

* Make probeTicker.C synchronous
  • Loading branch information
alpeb authored Sep 25, 2024
1 parent 995e51f commit 85222dd
Show file tree
Hide file tree
Showing 12 changed files with 165 additions and 54 deletions.
2 changes: 2 additions & 0 deletions multicluster/charts/linkerd-multicluster/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,11 @@ Kubernetes: `>=1.22.0-0`
| gateway.nodeSelector | object | `{}` | Node selectors for the gateway pod |
| gateway.pauseImage | string | `"gcr.io/google_containers/pause:3.2"` | The pause container to use |
| gateway.port | int | `4143` | The port on which all the gateway will accept incoming traffic |
| gateway.probe.failureThreshold | int | `3` | Minimum consecutive failures for the probe to be considered failed |
| gateway.probe.path | string | `"/ready"` | The path that will be used by remote clusters for determining whether the gateway is alive |
| gateway.probe.port | int | `4191` | The port used for liveliness probing |
| gateway.probe.seconds | int | `3` | The interval (in seconds) between liveness probes |
| gateway.probe.timeout | string | `"30s"` | Probe request timeout (in go's time.Duration format) |
| gateway.replicas | int | `1` | Number of replicas for the gateway pod |
| gateway.serviceAnnotations | object | `{}` | Annotations to add to the gateway service |
| gateway.serviceExternalTrafficPolicy | string | `""` | Set externalTrafficPolicy on gateway service |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,10 @@ metadata:
{{- with .Values.commonLabels }}{{ toYaml . | trim | nindent 4 }}{{- end }}
annotations:
mirror.linkerd.io/gateway-identity: {{.Values.gateway.name}}.{{.Release.Namespace}}.serviceaccount.identity.{{.Values.linkerdNamespace}}.{{.Values.identityTrustDomain}}
mirror.linkerd.io/probe-failure-threshold: "{{.Values.gateway.probe.failureThreshold}}"
mirror.linkerd.io/probe-period: "{{.Values.gateway.probe.seconds}}"
mirror.linkerd.io/probe-path: {{.Values.gateway.probe.path}}
mirror.linkerd.io/probe-timeout: "{{.Values.gateway.probe.timeout}}"
mirror.linkerd.io/multicluster-gateway: "true"
component: gateway
{{ include "partials.annotations.created-by" . }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ spec:
description: Spec for gateway health probe
type: object
properties:
failureThreshold:
default: "3"
description: Minimum consecutive failures for the probe to be considered failed
type: string
path:
description: Path of remote gateway health endpoint
type: string
Expand All @@ -49,6 +53,11 @@ spec:
port:
description: Port of remote gateway health endpoint
type: string
timeout:
default: 30s
description: Probe request timeout
format: duration
type: string
selector:
description: Kubernetes Label Selector
type: object
Expand Down
4 changes: 4 additions & 0 deletions multicluster/charts/linkerd-multicluster/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ gateway:
# nodePort -- Set the gateway nodePort (for LoadBalancer or NodePort) to a specific value
# nodePort:
probe:
# -- Minimum consecutive failures for the probe to be considered failed
failureThreshold: 3
# -- The path that will be used by remote clusters for determining whether the
# gateway is alive
path: /ready
Expand All @@ -21,6 +23,8 @@ gateway:
# nodePort:
# -- The interval (in seconds) between liveness probes
seconds: 3
# -- Probe request timeout (in go's time.Duration format)
timeout: 30s
# -- Annotations to add to the gateway service
serviceAnnotations: {}
# -- Set externalTrafficPolicy on gateway service
Expand Down
11 changes: 11 additions & 0 deletions multicluster/cmd/testdata/install_default.golden

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions multicluster/cmd/testdata/install_ha.golden

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions multicluster/cmd/testdata/install_psp.golden

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion multicluster/service-mirror/jittered_ticker.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func NewTicker(minDuration time.Duration, maxJitter time.Duration) *Ticker {
if maxJitter < 0 {
log.WithField("jitter", minDuration).Panic("Negative jitter")
}
c := make(chan time.Time, 1)
c := make(chan time.Time)
ticker := &Ticker{
C: c,
stop: make(chan bool),
Expand Down
77 changes: 40 additions & 37 deletions multicluster/service-mirror/probe_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ import (
logging "github.com/sirupsen/logrus"
)

const httpGatewayTimeoutMillis = 50000

// ProbeWorker is responsible for monitoring gateways using a probe specification
type ProbeWorker struct {
localGatewayName string
Expand Down Expand Up @@ -65,76 +63,81 @@ func (pw *ProbeWorker) Start() {
}

func (pw *ProbeWorker) run() {
successLabel := prometheus.Labels{probeSuccessfulLabel: "true"}
notSuccessLabel := prometheus.Labels{probeSuccessfulLabel: "false"}

probeTickerPeriod := pw.probeSpec.Period
maxJitter := pw.probeSpec.Period / 10 // max jitter is 10% of period
probeTicker := NewTicker(probeTickerPeriod, maxJitter)
defer probeTicker.Stop()

var failures uint32 = 0

probeLoop:
for {
select {
case <-pw.stopCh:
break probeLoop
case <-probeTicker.C:
pw.doProbe()
start := time.Now()
if err := pw.doProbe(); err != nil {
pw.log.Warn(err)
failures++
if failures < pw.probeSpec.FailureThreshold {
continue probeLoop
}

pw.log.Warnf("Failure threshold (%d) reached - Marking as unhealthy", pw.probeSpec.FailureThreshold)
pw.metrics.alive.Set(0)
pw.metrics.probes.With(notSuccessLabel).Inc()
if pw.alive {
pw.alive = false
pw.Liveness <- false
}
} else {
end := time.Since(start)
failures = 0

pw.log.Debug("Gateway is healthy")
pw.metrics.alive.Set(1)
pw.metrics.latency.Set(float64(end.Milliseconds()))
pw.metrics.latencies.Observe(float64(end.Milliseconds()))
pw.metrics.probes.With(successLabel).Inc()
if !pw.alive {
pw.alive = true
pw.Liveness <- true
}
}
}
}
}

func (pw *ProbeWorker) doProbe() {
func (pw *ProbeWorker) doProbe() error {
pw.RLock()
defer pw.RUnlock()

successLabel := prometheus.Labels{probeSuccessfulLabel: "true"}
notSuccessLabel := prometheus.Labels{probeSuccessfulLabel: "false"}

client := http.Client{
Timeout: httpGatewayTimeoutMillis * time.Millisecond,
Timeout: pw.probeSpec.Timeout,
}

strPort := strconv.Itoa(int(pw.probeSpec.Port))
urlAddress := net.JoinHostPort(pw.localGatewayName, strPort)
req, err := http.NewRequest("GET", fmt.Sprintf("http://%s%s", urlAddress, pw.probeSpec.Path), nil)
if err != nil {
pw.log.Errorf("Could not create a GET request to gateway: %s", err)
return
return fmt.Errorf("could not create a GET request to gateway: %w", err)
}

start := time.Now()
resp, err := client.Do(req)
end := time.Since(start)
if err != nil {
pw.log.Warnf("Problem connecting with gateway. Marking as unhealthy %s", err)
pw.metrics.alive.Set(0)
pw.metrics.probes.With(notSuccessLabel).Inc()
if pw.alive {
pw.alive = false
pw.Liveness <- false
}
return
return fmt.Errorf("problem connecting with gateway: %w", err)
}
if resp.StatusCode != 200 {
pw.log.Warnf("Gateway returned unexpected status %d. Marking as unhealthy", resp.StatusCode)
pw.metrics.alive.Set(0)
pw.metrics.probes.With(notSuccessLabel).Inc()
if pw.alive {
pw.alive = false
pw.Liveness <- false
}
} else {
pw.log.Debug("Gateway is healthy")
pw.metrics.alive.Set(1)
pw.metrics.latency.Set(float64(end.Milliseconds()))
pw.metrics.latencies.Observe(float64(end.Milliseconds()))
pw.metrics.probes.With(successLabel).Inc()
if !pw.alive {
pw.alive = true
pw.Liveness <- true
}
return fmt.Errorf("gateway returned unexpected status %d", resp.StatusCode)
}

if err := resp.Body.Close(); err != nil {
pw.log.Warnf("Failed to close response body %s", err)
}

return nil
}
10 changes: 6 additions & 4 deletions multicluster/values/values.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,12 @@ type Gateway struct {

// Probe contains all options for the Probe Service
type Probe struct {
Path string `json:"path"`
Port uint32 `json:"port"`
NodePort uint32 `json:"nodePort"`
Seconds uint32 `json:"seconds"`
FailureThreshold uint32 `json:"failureThreshold"`
Path string `json:"path"`
Port uint32 `json:"port"`
NodePort uint32 `json:"nodePort"`
Seconds uint32 `json:"seconds"`
Timeout string `json:"timeout"`
}

// NewInstallValues returns a new instance of the Values type.
Expand Down
6 changes: 6 additions & 0 deletions pkg/k8s/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,12 +467,18 @@ const (
// GatewayIdentity can be found on the remote gateway service
GatewayIdentity = SvcMirrorPrefix + "/gateway-identity"

// GatewayProbeFailureThreshold is the minimum consecutive failures for the probe to be considered failed
GatewayProbeFailureThreshold = SvcMirrorPrefix + "/probe-failure-threshold"

// GatewayProbePeriod the interval at which the health of the gateway should be probed
GatewayProbePeriod = SvcMirrorPrefix + "/probe-period"

// GatewayProbePath the path at which the health of the gateway should be probed
GatewayProbePath = SvcMirrorPrefix + "/probe-path"

// GatewayProbeTimeout is the probe request timeout
GatewayProbeTimeout = SvcMirrorPrefix + "/probe-timeout"

// ConfigKeyName is the key in the secret that stores the kubeconfig needed to connect
// to a remote cluster
ConfigKeyName = "kubeconfig"
Expand Down
Loading

0 comments on commit 85222dd

Please sign in to comment.