Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 36 additions & 28 deletions cmd/queue/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ const (
badProbeTemplate = "unexpected probe header value: %s"
failingHealthcheck = "failing healthcheck"

healthURLTemplate = "http://127.0.0.1:%d"
healthURLPrefix = "http://127.0.0.1:"
// The 25 millisecond retry interval is an unscientific compromise between wanting to get
// started as early as possible while still wanting to give the container some breathing
// room to get up and running.
Expand All @@ -77,6 +77,13 @@ var (
readinessProbeTimeout = flag.Int("probe-period", -1, "run readiness probe with given timeout")
)

// Scaled down config to use during exec probing.
type probeConfig struct {
QueueServingPort int `split_words:"true" required:"true"`
// DownwardAPI configuration for pod labels
DownwardAPILabelsPath string `split_words:"true"`
}

type config struct {
ContainerConcurrency int `split_words:"true" required:"true"`
QueueServingPort int `split_words:"true" required:"true"`
Expand Down Expand Up @@ -236,40 +243,35 @@ func knativeProbeHandler(healthState *health.State, prober func() bool, isAggres
}
}

func probeQueueHealthPath(port, timeoutSeconds int, env config) error {
if port <= 0 {
return fmt.Errorf("port must be a positive value, got %d", port)
func probeQueueHealthPath(timeoutSeconds int, env probeConfig) error {
if env.QueueServingPort <= 0 {
return fmt.Errorf("port must be a positive value, got %d", env.QueueServingPort)
}

url := fmt.Sprintf(healthURLTemplate, port)
url := healthURLPrefix + strconv.Itoa(env.QueueServingPort)
timeoutDuration := readiness.PollTimeout
if timeoutSeconds != 0 {
timeoutDuration = time.Duration(timeoutSeconds) * time.Second
}

req, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
return fmt.Errorf("probe failed: error creating request: %w", err)
}
// Add the header to indicate this is a probe request.
req.Header.Add(network.ProbeHeaderName, queue.Name)
req.Header.Add(network.UserAgentKey, network.QueueProxyUserAgent)

httpClient := &http.Client{
Transport: &http.Transport{
// Do not use the cached connection
DisableKeepAlives: true,
},
Timeout: timeoutDuration,
}
ctx, cancel := context.WithTimeout(context.Background(), timeoutDuration)
defer cancel()
stopCh := ctx.Done()

var lastErr error
// Using PollImmediateUntil instead of PollImmediate because if timeout is reached while waiting for first
// invocation of conditionFunc, it exits immediately without trying for a second time.
timeoutErr := wait.PollImmediateUntil(aggressivePollInterval, func() (bool, error) {
var req *http.Request
req, lastErr = http.NewRequest(http.MethodGet, url, nil)
if lastErr != nil {
// Return nil error for retrying
return false, nil
}
// Add the header to indicate this is a probe request.
req.Header.Add(network.ProbeHeaderName, queue.Name)
req.Header.Add(network.UserAgentKey, network.QueueProxyUserAgent)
res, lastErr := httpClient.Do(req)
if lastErr != nil {
// Return nil error for retrying
Expand All @@ -285,7 +287,7 @@ func probeQueueHealthPath(port, timeoutSeconds int, env config) error {
return false, errors.New("failing probe deliberately for pod scaledown")
}
return success, nil
}, stopCh)
}, ctx.Done())

if lastErr != nil {
return fmt.Errorf("failed to probe: %w", lastErr)
Expand All @@ -302,23 +304,29 @@ func probeQueueHealthPath(port, timeoutSeconds int, env config) error {
func main() {
flag.Parse()

// Parse the environment.
var env config
if err := envconfig.Process("", &env); err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}

// If this is set, we run as a standalone binary to probe the queue-proxy.
if *readinessProbeTimeout >= 0 {
if err := probeQueueHealthPath(env.QueueServingPort, *readinessProbeTimeout, env); err != nil {
// Parse the environment.
var env probeConfig
if err := envconfig.Process("", &env); err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
if err := probeQueueHealthPath(*readinessProbeTimeout, env); err != nil {
// used instead of the logger to produce a concise event message
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
os.Exit(0)
}

// Parse the environment.
var env config
if err := envconfig.Process("", &env); err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}

// Setup the logger.
logger, _ = pkglogging.NewLogger(env.ServingLoggingConfig, env.ServingLoggingLevel)
logger = logger.Named("queueproxy")
Expand Down
21 changes: 10 additions & 11 deletions cmd/queue/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,19 +169,15 @@ func TestProbeHandler(t *testing.T) {
}

func TestProbeQueueInvalidPort(t *testing.T) {
const port = 0 // invalid port

if err := probeQueueHealthPath(port, 1, config{}); err == nil {
if err := probeQueueHealthPath(1, probeConfig{QueueServingPort: 0}); err == nil {
t.Error("Expected error, got nil")
} else if diff := cmp.Diff(err.Error(), "port must be a positive value, got 0"); diff != "" {
t.Errorf("Unexpected not ready message: %s", diff)
}
}

func TestProbeQueueConnectionFailure(t *testing.T) {
port := 12345 // some random port (that's not listening)

if err := probeQueueHealthPath(port, 1, config{}); err == nil {
if err := probeQueueHealthPath(1, probeConfig{QueueServingPort: 12345}); err == nil {
t.Error("Expected error, got nil")
}
}
Expand All @@ -205,7 +201,7 @@ func TestProbeQueueNotReady(t *testing.T) {
t.Fatalf("Failed to convert port(%s) to int: %v", u.Port(), err)
}

err = probeQueueHealthPath(port, 1, config{})
err = probeQueueHealthPath(1, probeConfig{QueueServingPort: port})

if diff := cmp.Diff(err.Error(), "probe returned not ready"); diff != "" {
t.Errorf("Unexpected not ready message: %s", diff)
Expand Down Expand Up @@ -235,7 +231,7 @@ func TestProbeQueueReady(t *testing.T) {
t.Fatalf("Failed to convert port(%s) to int: %v", u.Port(), err)
}

if err = probeQueueHealthPath(port, 1, config{}); err != nil {
if err = probeQueueHealthPath(1, probeConfig{QueueServingPort: port}); err != nil {
t.Errorf("probeQueueHealthPath(%d, 1s) = %s", port, err)
}

Expand Down Expand Up @@ -276,7 +272,10 @@ func TestProbeFailFast(t *testing.T) {
f.Close()

start := time.Now()
if err = probeQueueHealthPath(port, 1 /*seconds*/, config{DownwardAPILabelsPath: f.Name()}); err == nil {
if err = probeQueueHealthPath(1 /*seconds*/, probeConfig{
QueueServingPort: port,
DownwardAPILabelsPath: f.Name(),
}); err == nil {
t.Error("probeQueueHealthPath did not fail")
}

Expand Down Expand Up @@ -307,7 +306,7 @@ func TestProbeQueueTimeout(t *testing.T) {
}

timeout := 1
if err = probeQueueHealthPath(port, timeout, config{}); err == nil {
if err = probeQueueHealthPath(timeout, probeConfig{QueueServingPort: port}); err == nil {
t.Errorf("Expected probeQueueHealthPath(%d, %v) to return timeout error", port, timeout)
}

Expand Down Expand Up @@ -341,7 +340,7 @@ func TestProbeQueueDelayedReady(t *testing.T) {
}

timeout := 0
if err := probeQueueHealthPath(port, timeout, config{}); err != nil {
if err := probeQueueHealthPath(timeout, probeConfig{QueueServingPort: port}); err != nil {
t.Errorf("probeQueueHealthPath(%d) = %s", port, err)
}
}
Expand Down