diff --git a/cmd/queue/main.go b/cmd/queue/main.go index 50cd04083452..96dbaeefbc7e 100644 --- a/cmd/queue/main.go +++ b/cmd/queue/main.go @@ -62,7 +62,7 @@ const ( badProbeTemplate = "unexpected probe header value: %s" failingHealthcheck = "failing healthcheck" - healthURLTemplate = "http://127.0.0.1:%d" + healthURLPrefix = "http://127.0.0.1:" // The 25 millisecond retry interval is an unscientific compromise between wanting to get // started as early as possible while still wanting to give the container some breathing // room to get up and running. @@ -77,6 +77,13 @@ var ( readinessProbeTimeout = flag.Int("probe-period", -1, "run readiness probe with given timeout") ) +// Scaled down config to use during exec probing. +type probeConfig struct { + QueueServingPort int `split_words:"true" required:"true"` + // DownwardAPI configuration for pod labels + DownwardAPILabelsPath string `split_words:"true"` +} + type config struct { ContainerConcurrency int `split_words:"true" required:"true"` QueueServingPort int `split_words:"true" required:"true"` @@ -236,40 +243,35 @@ func knativeProbeHandler(healthState *health.State, prober func() bool, isAggres } } -func probeQueueHealthPath(port, timeoutSeconds int, env config) error { - if port <= 0 { - return fmt.Errorf("port must be a positive value, got %d", port) +func probeQueueHealthPath(timeoutSeconds int, env probeConfig) error { + if env.QueueServingPort <= 0 { + return fmt.Errorf("port must be a positive value, got %d", env.QueueServingPort) } - url := fmt.Sprintf(healthURLTemplate, port) + url := healthURLPrefix + strconv.Itoa(env.QueueServingPort) timeoutDuration := readiness.PollTimeout if timeoutSeconds != 0 { timeoutDuration = time.Duration(timeoutSeconds) * time.Second } + + req, err := http.NewRequest(http.MethodGet, url, nil) + if err != nil { + return fmt.Errorf("probe failed: error creating request: %w", err) + } + // Add the header to indicate this is a probe request. + req.Header.Add(network.ProbeHeaderName, queue.Name) + req.Header.Add(network.UserAgentKey, network.QueueProxyUserAgent) + httpClient := &http.Client{ - Transport: &http.Transport{ - // Do not use the cached connection - DisableKeepAlives: true, - }, Timeout: timeoutDuration, } ctx, cancel := context.WithTimeout(context.Background(), timeoutDuration) defer cancel() - stopCh := ctx.Done() var lastErr error // Using PollImmediateUntil instead of PollImmediate because if timeout is reached while waiting for first // invocation of conditionFunc, it exits immediately without trying for a second time. timeoutErr := wait.PollImmediateUntil(aggressivePollInterval, func() (bool, error) { - var req *http.Request - req, lastErr = http.NewRequest(http.MethodGet, url, nil) - if lastErr != nil { - // Return nil error for retrying - return false, nil - } - // Add the header to indicate this is a probe request. - req.Header.Add(network.ProbeHeaderName, queue.Name) - req.Header.Add(network.UserAgentKey, network.QueueProxyUserAgent) res, lastErr := httpClient.Do(req) if lastErr != nil { // Return nil error for retrying @@ -285,7 +287,7 @@ func probeQueueHealthPath(port, timeoutSeconds int, env config) error { return false, errors.New("failing probe deliberately for pod scaledown") } return success, nil - }, stopCh) + }, ctx.Done()) if lastErr != nil { return fmt.Errorf("failed to probe: %w", lastErr) @@ -302,16 +304,15 @@ func probeQueueHealthPath(port, timeoutSeconds int, env config) error { func main() { flag.Parse() - // Parse the environment. - var env config - if err := envconfig.Process("", &env); err != nil { - fmt.Fprintln(os.Stderr, err) - os.Exit(1) - } - // If this is set, we run as a standalone binary to probe the queue-proxy. if *readinessProbeTimeout >= 0 { - if err := probeQueueHealthPath(env.QueueServingPort, *readinessProbeTimeout, env); err != nil { + // Parse the environment. + var env probeConfig + if err := envconfig.Process("", &env); err != nil { + fmt.Fprintln(os.Stderr, err) + os.Exit(1) + } + if err := probeQueueHealthPath(*readinessProbeTimeout, env); err != nil { // used instead of the logger to produce a concise event message fmt.Fprintln(os.Stderr, err) os.Exit(1) @@ -319,6 +320,13 @@ func main() { os.Exit(0) } + // Parse the environment. + var env config + if err := envconfig.Process("", &env); err != nil { + fmt.Fprintln(os.Stderr, err) + os.Exit(1) + } + // Setup the logger. logger, _ = pkglogging.NewLogger(env.ServingLoggingConfig, env.ServingLoggingLevel) logger = logger.Named("queueproxy") diff --git a/cmd/queue/main_test.go b/cmd/queue/main_test.go index 932409e1e0e7..25664ccbbb6d 100644 --- a/cmd/queue/main_test.go +++ b/cmd/queue/main_test.go @@ -169,9 +169,7 @@ func TestProbeHandler(t *testing.T) { } func TestProbeQueueInvalidPort(t *testing.T) { - const port = 0 // invalid port - - if err := probeQueueHealthPath(port, 1, config{}); err == nil { + if err := probeQueueHealthPath(1, probeConfig{QueueServingPort: 0}); err == nil { t.Error("Expected error, got nil") } else if diff := cmp.Diff(err.Error(), "port must be a positive value, got 0"); diff != "" { t.Errorf("Unexpected not ready message: %s", diff) @@ -179,9 +177,7 @@ func TestProbeQueueInvalidPort(t *testing.T) { } func TestProbeQueueConnectionFailure(t *testing.T) { - port := 12345 // some random port (that's not listening) - - if err := probeQueueHealthPath(port, 1, config{}); err == nil { + if err := probeQueueHealthPath(1, probeConfig{QueueServingPort: 12345}); err == nil { t.Error("Expected error, got nil") } } @@ -205,7 +201,7 @@ func TestProbeQueueNotReady(t *testing.T) { t.Fatalf("Failed to convert port(%s) to int: %v", u.Port(), err) } - err = probeQueueHealthPath(port, 1, config{}) + err = probeQueueHealthPath(1, probeConfig{QueueServingPort: port}) if diff := cmp.Diff(err.Error(), "probe returned not ready"); diff != "" { t.Errorf("Unexpected not ready message: %s", diff) @@ -235,7 +231,7 @@ func TestProbeQueueReady(t *testing.T) { t.Fatalf("Failed to convert port(%s) to int: %v", u.Port(), err) } - if err = probeQueueHealthPath(port, 1, config{}); err != nil { + if err = probeQueueHealthPath(1, probeConfig{QueueServingPort: port}); err != nil { t.Errorf("probeQueueHealthPath(%d, 1s) = %s", port, err) } @@ -276,7 +272,10 @@ func TestProbeFailFast(t *testing.T) { f.Close() start := time.Now() - if err = probeQueueHealthPath(port, 1 /*seconds*/, config{DownwardAPILabelsPath: f.Name()}); err == nil { + if err = probeQueueHealthPath(1 /*seconds*/, probeConfig{ + QueueServingPort: port, + DownwardAPILabelsPath: f.Name(), + }); err == nil { t.Error("probeQueueHealthPath did not fail") } @@ -307,7 +306,7 @@ func TestProbeQueueTimeout(t *testing.T) { } timeout := 1 - if err = probeQueueHealthPath(port, timeout, config{}); err == nil { + if err = probeQueueHealthPath(timeout, probeConfig{QueueServingPort: port}); err == nil { t.Errorf("Expected probeQueueHealthPath(%d, %v) to return timeout error", port, timeout) } @@ -341,7 +340,7 @@ func TestProbeQueueDelayedReady(t *testing.T) { } timeout := 0 - if err := probeQueueHealthPath(port, timeout, config{}); err != nil { + if err := probeQueueHealthPath(timeout, probeConfig{QueueServingPort: port}); err != nil { t.Errorf("probeQueueHealthPath(%d) = %s", port, err) } }