Skip to content

Commit

Permalink
Add OFSwitch connection check to Agent's liveness probes
Browse files Browse the repository at this point in the history
This helps automatic recovery if some issues cause OFSwitch reconnection to
not work properly.

It also fixes a race condition between the IsConnected and SwitchConnected
methods of OFBridge.

For #4092

Signed-off-by: Quan Tian <[email protected]>
  • Loading branch information
tnqn committed Aug 24, 2022
1 parent 3d5c617 commit 2071cbc
Show file tree
Hide file tree
Showing 12 changed files with 346 additions and 57 deletions.
12 changes: 6 additions & 6 deletions build/charts/antrea/templates/agent/daemonset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -175,12 +175,12 @@ spec:
name: api
protocol: TCP
livenessProbe:
exec:
command:
- /bin/sh
- -c
- container_liveness_probe agent
initialDelaySeconds: 5
httpGet:
host: localhost
path: /livez
port: api
scheme: HTTPS
initialDelaySeconds: 10
timeoutSeconds: 5
periodSeconds: 10
failureThreshold: 5
Expand Down
4 changes: 1 addition & 3 deletions build/images/scripts/container_liveness_probe
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@

source daemon_status

if [ $1 == agent ]; then
exit 0
elif [ $1 == ovs ]; then
if [ $1 == ovs ]; then
check_ovs_status && exit 0
elif [ $1 == ovs-ipsec ]; then
check_ovs_ipsec_status && exit 0
Expand Down
12 changes: 6 additions & 6 deletions build/yamls/antrea-aks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4082,12 +4082,12 @@ spec:
name: api
protocol: TCP
livenessProbe:
exec:
command:
- /bin/sh
- -c
- container_liveness_probe agent
initialDelaySeconds: 5
httpGet:
host: localhost
path: /livez
port: api
scheme: HTTPS
initialDelaySeconds: 10
timeoutSeconds: 5
periodSeconds: 10
failureThreshold: 5
Expand Down
12 changes: 6 additions & 6 deletions build/yamls/antrea-eks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4084,12 +4084,12 @@ spec:
name: api
protocol: TCP
livenessProbe:
exec:
command:
- /bin/sh
- -c
- container_liveness_probe agent
initialDelaySeconds: 5
httpGet:
host: localhost
path: /livez
port: api
scheme: HTTPS
initialDelaySeconds: 10
timeoutSeconds: 5
periodSeconds: 10
failureThreshold: 5
Expand Down
12 changes: 6 additions & 6 deletions build/yamls/antrea-gke.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4081,12 +4081,12 @@ spec:
name: api
protocol: TCP
livenessProbe:
exec:
command:
- /bin/sh
- -c
- container_liveness_probe agent
initialDelaySeconds: 5
httpGet:
host: localhost
path: /livez
port: api
scheme: HTTPS
initialDelaySeconds: 10
timeoutSeconds: 5
periodSeconds: 10
failureThreshold: 5
Expand Down
12 changes: 6 additions & 6 deletions build/yamls/antrea-ipsec.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4101,12 +4101,12 @@ spec:
name: api
protocol: TCP
livenessProbe:
exec:
command:
- /bin/sh
- -c
- container_liveness_probe agent
initialDelaySeconds: 5
httpGet:
host: localhost
path: /livez
port: api
scheme: HTTPS
initialDelaySeconds: 10
timeoutSeconds: 5
periodSeconds: 10
failureThreshold: 5
Expand Down
12 changes: 6 additions & 6 deletions build/yamls/antrea.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4081,12 +4081,12 @@ spec:
name: api
protocol: TCP
livenessProbe:
exec:
command:
- /bin/sh
- -c
- container_liveness_probe agent
initialDelaySeconds: 5
httpGet:
host: localhost
path: /livez
port: api
scheme: HTTPS
initialDelaySeconds: 10
timeoutSeconds: 5
periodSeconds: 10
failureThreshold: 5
Expand Down
11 changes: 9 additions & 2 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/server/options"
"k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/tools/cache"
Expand Down Expand Up @@ -756,13 +757,19 @@ func run(o *Options) error {
if o.nodeType == config.ExternalNode {
bindAddress = ipv4Localhost
}
secureServing := options.NewSecureServingOptions().WithLoopback()
secureServing.BindAddress = bindAddress
secureServing.BindPort = o.config.APIPort
authentication := options.NewDelegatingAuthenticationOptions()
authorization := options.NewDelegatingAuthorizationOptions().WithAlwaysAllowPaths("/healthz", "/livez", "/readyz")
apiServer, err := apiserver.New(
agentQuerier,
networkPolicyController,
mcastController,
externalIPController,
bindAddress,
o.config.APIPort,
secureServing,
authentication,
authorization,
*o.config.EnablePrometheusMetrics,
o.config.ClientConnection.Kubeconfig,
cipherSuites,
Expand Down
46 changes: 34 additions & 12 deletions pkg/agent/apiserver/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,21 @@ func installAPIGroup(s *genericapiserver.GenericAPIServer, aq agentquerier.Agent
}

// New creates an APIServer for running in antrea agent.
func New(aq agentquerier.AgentQuerier, npq querier.AgentNetworkPolicyInfoQuerier, mq querier.AgentMulticastInfoQuerier, seipq querier.ServiceExternalIPStatusQuerier,
bindAddress net.IP, bindPort int, enableMetrics bool, kubeconfig string, cipherSuites []uint16, tlsMinVersion uint16, v4Enabled, v6Enabled bool) (*agentAPIServer, error) {
cfg, err := newConfig(npq, bindAddress, bindPort, enableMetrics, kubeconfig)
func New(aq agentquerier.AgentQuerier,
npq querier.AgentNetworkPolicyInfoQuerier,
mq querier.AgentMulticastInfoQuerier,
seipq querier.ServiceExternalIPStatusQuerier,
secureServing *genericoptions.SecureServingOptionsWithLoopback,
authentication *genericoptions.DelegatingAuthenticationOptions,
authorization *genericoptions.DelegatingAuthorizationOptions,
enableMetrics bool,
kubeconfig string,
cipherSuites []uint16,
tlsMinVersion uint16,
v4Enabled,
v6Enabled bool,
) (*agentAPIServer, error) {
cfg, err := newConfig(aq, npq, secureServing, authentication, authorization, enableMetrics, kubeconfig)
if err != nil {
return nil, err
}
Expand All @@ -118,11 +130,14 @@ func New(aq agentquerier.AgentQuerier, npq querier.AgentNetworkPolicyInfoQuerier
return &agentAPIServer{GenericAPIServer: s}, nil
}

func newConfig(npq querier.AgentNetworkPolicyInfoQuerier, bindAddress net.IP, bindPort int, enableMetrics bool, kubeconfig string) (*genericapiserver.CompletedConfig, error) {
secureServing := genericoptions.NewSecureServingOptions().WithLoopback()
authentication := genericoptions.NewDelegatingAuthenticationOptions()
authorization := genericoptions.NewDelegatingAuthorizationOptions().WithAlwaysAllowPaths("/healthz", "/livez", "/readyz")

func newConfig(aq agentquerier.AgentQuerier,
npq querier.AgentNetworkPolicyInfoQuerier,
secureServing *genericoptions.SecureServingOptionsWithLoopback,
authentication *genericoptions.DelegatingAuthenticationOptions,
authorization *genericoptions.DelegatingAuthorizationOptions,
enableMetrics bool,
kubeconfig string,
) (*genericapiserver.CompletedConfig, error) {
// kubeconfig file is useful when antrea-agent isn't running as a Pod.
if len(kubeconfig) > 0 {
authentication.RemoteKubeConfigFile = kubeconfig
Expand All @@ -132,8 +147,6 @@ func newConfig(npq querier.AgentNetworkPolicyInfoQuerier, bindAddress net.IP, bi
// Set the PairName but leave certificate directory blank to generate in-memory by default.
secureServing.ServerCert.CertDirectory = ""
secureServing.ServerCert.PairName = Name
secureServing.BindAddress = bindAddress
secureServing.BindPort = bindPort

if err := secureServing.MaybeDefaultWithSelfSignedCerts("localhost", nil, []net.IP{net.ParseIP("127.0.0.1"), net.IPv6loopback}); err != nil {
return nil, fmt.Errorf("error creating self-signed certificates: %v", err)
Expand Down Expand Up @@ -164,13 +177,22 @@ func newConfig(npq querier.AgentNetworkPolicyInfoQuerier, bindAddress net.IP, bi
}
serverConfig.EnableMetrics = enableMetrics
// Add readiness probe to check the status of watchers.
check := healthz.NamedCheck("watcher", func(_ *http.Request) error {
watcherCheck := healthz.NamedCheck("watcher", func(_ *http.Request) error {
if npq.GetControllerConnectionStatus() {
return nil
}
return fmt.Errorf("some watchers may not be connected")
})
serverConfig.ReadyzChecks = append(serverConfig.ReadyzChecks, check)
serverConfig.ReadyzChecks = append(serverConfig.ReadyzChecks, watcherCheck)
// Add liveness probe to check the connection with OFSwitch.
// This helps automatic recovery if some issues cause OFSwitch reconnection to not work properly, e.g. issue #4092.
ovsConnCheck := healthz.NamedCheck("ovs", func(_ *http.Request) error {
if aq.GetOpenflowClient().IsConnected() {
return nil
}
return fmt.Errorf("disconnected from OFSwitch")
})
serverConfig.LivezChecks = append(serverConfig.LivezChecks, ovsConnCheck)

completedServerCfg := serverConfig.Complete(nil)
return &completedServerCfg, nil
Expand Down
162 changes: 162 additions & 0 deletions pkg/agent/apiserver/apiserver_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
// Copyright 2022 Antrea Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package apiserver

import (
"crypto/tls"
"net"
"net/http"
"net/http/httptest"
"os"
"testing"
"time"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"k8s.io/apiserver/pkg/server/options"

"antrea.io/antrea/pkg/agent/config"
oftest "antrea.io/antrea/pkg/agent/openflow/testing"
aqtest "antrea.io/antrea/pkg/agent/querier/testing"
queriertest "antrea.io/antrea/pkg/querier/testing"
"antrea.io/antrea/pkg/version"
)

type fakeAgentAPIServer struct {
*agentAPIServer
agentQuerier *aqtest.MockAgentQuerier
npQuerier *queriertest.MockAgentNetworkPolicyInfoQuerier
ofClient *oftest.MockClient
}

func newFakeAPIServer(t *testing.T) *fakeAgentAPIServer {
kubeConfigFile, err := os.CreateTemp("", "kubeconfig")
if err != nil {
t.Fatal(err)
}
defer func() {
kubeConfigFile.Close()
os.Remove(kubeConfigFile.Name())
}()
if _, err := kubeConfigFile.Write([]byte(`
apiVersion: v1
kind: Config
clusters:
- cluster:
server: http://localhost:56789
name: cluster
contexts:
- context:
cluster: cluster
name: cluster
current-context: cluster
`)); err != nil {
t.Fatal(err)
}
originalTokenPath := TokenPath
tokenFile, err := os.CreateTemp("", "token")
require.NoError(t, err)
TokenPath = tokenFile.Name()
defer func() {
TokenPath = originalTokenPath
tokenFile.Close()
os.Remove(tokenFile.Name())
}()
version.Version = "v1.2.3"
ctrl := gomock.NewController(t)
defer ctrl.Finish()
agentQuerier := aqtest.NewMockAgentQuerier(ctrl)
agentQuerier.EXPECT().GetNodeConfig().Return(&config.NodeConfig{OVSBridge: "br-int"})
npQuerier := queriertest.NewMockAgentNetworkPolicyInfoQuerier(ctrl)
ofClient := oftest.NewMockClient(ctrl)
agentQuerier.EXPECT().GetOpenflowClient().AnyTimes().Return(ofClient)

secureServing := options.NewSecureServingOptions().WithLoopback()
secureServing.BindAddress = net.ParseIP("127.0.0.1")
secureServing.BindPort = 10000
authentication := options.NewDelegatingAuthenticationOptions()
// InClusterLookup is skipped when testing, otherwise it would always fail as there is no real cluster.
authentication.SkipInClusterLookup = true
authorization := options.NewDelegatingAuthorizationOptions().WithAlwaysAllowPaths("/healthz", "/livez", "/readyz")
apiServer, err := New(agentQuerier, npQuerier, nil, nil, secureServing, authentication, authorization, true, kubeConfigFile.Name(), nil, tls.VersionTLS10, true, true)
require.NoError(t, err)
fakeAPIServer := &fakeAgentAPIServer{
agentAPIServer: apiServer,
agentQuerier: agentQuerier,
npQuerier: npQuerier,
ofClient: ofClient,
}
return fakeAPIServer
}

func TestAPIServerLivezCheck(t *testing.T) {
tests := []struct {
name string
setupFunc func(*fakeAgentAPIServer)
expectedCode int
expectedBody string
}{
{
name: "ovs connected",
setupFunc: func(apiserver *fakeAgentAPIServer) {
apiserver.ofClient.EXPECT().IsConnected().Return(true)
},
expectedCode: 200,
expectedBody: "ok",
},
{
name: "ovs disconnected",
setupFunc: func(apiserver *fakeAgentAPIServer) {
apiserver.ofClient.EXPECT().IsConnected().Return(false)
},
expectedCode: 500,
expectedBody: `[+]ping ok
[+]log ok
[-]ovs failed: reason withheld
[+]poststarthook/max-in-flight-filter ok
livez check failed
`,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
apiserver := newFakeAPIServer(t)
stopCh := make(chan struct{})
defer close(stopCh)
go func() {
apiserver.Run(stopCh)
}()
// Wait for APIServer to be healthy so checks installed by default are ensured ok.
assert.Eventuallyf(t, func() bool {
response := getResponse(apiserver, "/healthz")
return response.Body.String() == "ok"
}, time.Second*5, time.Millisecond*100, "APIServer didn't become health in 5 seconds")

tt.setupFunc(apiserver)
response := getResponse(apiserver, "/livez")
assert.Equal(t, tt.expectedBody, response.Body.String())
assert.Equal(t, tt.expectedCode, response.Code)
})
}
}

func getResponse(apiserver *fakeAgentAPIServer, query string) *httptest.ResponseRecorder {
req, _ := http.NewRequest(http.MethodGet, query, nil)
recorder := httptest.NewRecorder()
apiserver.GenericAPIServer.Handler.ServeHTTP(recorder, req)
return recorder
}
Loading

0 comments on commit 2071cbc

Please sign in to comment.