Skip to content

Commit

Permalink
Add OFSwitch connection check to Agent's liveness probes
Browse files Browse the repository at this point in the history
This helps automatic recovery if some issues cause OFSwitch reconnection to
not work properly.

It also fixes a race condition between the IsConnected and SwitchConnected
methods of OFBridge.

For #4092

Signed-off-by: Quan Tian <[email protected]>
  • Loading branch information
tnqn committed Aug 24, 2022
1 parent 3d5c617 commit 9d1be1e
Show file tree
Hide file tree
Showing 11 changed files with 315 additions and 47 deletions.
12 changes: 6 additions & 6 deletions build/charts/antrea/templates/agent/daemonset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -175,12 +175,12 @@ spec:
name: api
protocol: TCP
livenessProbe:
exec:
command:
- /bin/sh
- -c
- container_liveness_probe agent
initialDelaySeconds: 5
httpGet:
host: localhost
path: /livez
port: api
scheme: HTTPS
initialDelaySeconds: 10
timeoutSeconds: 5
periodSeconds: 10
failureThreshold: 5
Expand Down
4 changes: 1 addition & 3 deletions build/images/scripts/container_liveness_probe
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@

source daemon_status

if [ $1 == agent ]; then
exit 0
elif [ $1 == ovs ]; then
if [ $1 == ovs ]; then
check_ovs_status && exit 0
elif [ $1 == ovs-ipsec ]; then
check_ovs_ipsec_status && exit 0
Expand Down
12 changes: 6 additions & 6 deletions build/yamls/antrea-aks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4082,12 +4082,12 @@ spec:
name: api
protocol: TCP
livenessProbe:
exec:
command:
- /bin/sh
- -c
- container_liveness_probe agent
initialDelaySeconds: 5
httpGet:
host: localhost
path: /livez
port: api
scheme: HTTPS
initialDelaySeconds: 10
timeoutSeconds: 5
periodSeconds: 10
failureThreshold: 5
Expand Down
12 changes: 6 additions & 6 deletions build/yamls/antrea-eks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4084,12 +4084,12 @@ spec:
name: api
protocol: TCP
livenessProbe:
exec:
command:
- /bin/sh
- -c
- container_liveness_probe agent
initialDelaySeconds: 5
httpGet:
host: localhost
path: /livez
port: api
scheme: HTTPS
initialDelaySeconds: 10
timeoutSeconds: 5
periodSeconds: 10
failureThreshold: 5
Expand Down
12 changes: 6 additions & 6 deletions build/yamls/antrea-gke.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4081,12 +4081,12 @@ spec:
name: api
protocol: TCP
livenessProbe:
exec:
command:
- /bin/sh
- -c
- container_liveness_probe agent
initialDelaySeconds: 5
httpGet:
host: localhost
path: /livez
port: api
scheme: HTTPS
initialDelaySeconds: 10
timeoutSeconds: 5
periodSeconds: 10
failureThreshold: 5
Expand Down
12 changes: 6 additions & 6 deletions build/yamls/antrea-ipsec.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4101,12 +4101,12 @@ spec:
name: api
protocol: TCP
livenessProbe:
exec:
command:
- /bin/sh
- -c
- container_liveness_probe agent
initialDelaySeconds: 5
httpGet:
host: localhost
path: /livez
port: api
scheme: HTTPS
initialDelaySeconds: 10
timeoutSeconds: 5
periodSeconds: 10
failureThreshold: 5
Expand Down
12 changes: 6 additions & 6 deletions build/yamls/antrea.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4081,12 +4081,12 @@ spec:
name: api
protocol: TCP
livenessProbe:
exec:
command:
- /bin/sh
- -c
- container_liveness_probe agent
initialDelaySeconds: 5
httpGet:
host: localhost
path: /livez
port: api
scheme: HTTPS
initialDelaySeconds: 10
timeoutSeconds: 5
periodSeconds: 10
failureThreshold: 5
Expand Down
25 changes: 21 additions & 4 deletions pkg/agent/apiserver/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func installAPIGroup(s *genericapiserver.GenericAPIServer, aq agentquerier.Agent
// New creates an APIServer for running in antrea agent.
func New(aq agentquerier.AgentQuerier, npq querier.AgentNetworkPolicyInfoQuerier, mq querier.AgentMulticastInfoQuerier, seipq querier.ServiceExternalIPStatusQuerier,
bindAddress net.IP, bindPort int, enableMetrics bool, kubeconfig string, cipherSuites []uint16, tlsMinVersion uint16, v4Enabled, v6Enabled bool) (*agentAPIServer, error) {
cfg, err := newConfig(npq, bindAddress, bindPort, enableMetrics, kubeconfig)
cfg, err := newConfig(aq, npq, bindAddress, bindPort, enableMetrics, kubeconfig, false)
if err != nil {
return nil, err
}
Expand All @@ -118,7 +118,13 @@ func New(aq agentquerier.AgentQuerier, npq querier.AgentNetworkPolicyInfoQuerier
return &agentAPIServer{GenericAPIServer: s}, nil
}

func newConfig(npq querier.AgentNetworkPolicyInfoQuerier, bindAddress net.IP, bindPort int, enableMetrics bool, kubeconfig string) (*genericapiserver.CompletedConfig, error) {
func newConfig(aq agentquerier.AgentQuerier,
npq querier.AgentNetworkPolicyInfoQuerier,
bindAddress net.IP,
bindPort int,
enableMetrics bool,
kubeconfig string,
skipInClusterLookup bool) (*genericapiserver.CompletedConfig, error) {
secureServing := genericoptions.NewSecureServingOptions().WithLoopback()
authentication := genericoptions.NewDelegatingAuthenticationOptions()
authorization := genericoptions.NewDelegatingAuthorizationOptions().WithAlwaysAllowPaths("/healthz", "/livez", "/readyz")
Expand All @@ -128,6 +134,8 @@ func newConfig(npq querier.AgentNetworkPolicyInfoQuerier, bindAddress net.IP, bi
authentication.RemoteKubeConfigFile = kubeconfig
authorization.RemoteKubeConfigFile = kubeconfig
}
// InClusterLookup is skipped when testing, otherwise it would always fail as there is no real cluster.
authentication.SkipInClusterLookup = skipInClusterLookup

// Set the PairName but leave certificate directory blank to generate in-memory by default.
secureServing.ServerCert.CertDirectory = ""
Expand Down Expand Up @@ -164,13 +172,22 @@ func newConfig(npq querier.AgentNetworkPolicyInfoQuerier, bindAddress net.IP, bi
}
serverConfig.EnableMetrics = enableMetrics
// Add readiness probe to check the status of watchers.
check := healthz.NamedCheck("watcher", func(_ *http.Request) error {
watcherCheck := healthz.NamedCheck("watcher", func(_ *http.Request) error {
if npq.GetControllerConnectionStatus() {
return nil
}
return fmt.Errorf("some watchers may not be connected")
})
serverConfig.ReadyzChecks = append(serverConfig.ReadyzChecks, check)
serverConfig.ReadyzChecks = append(serverConfig.ReadyzChecks, watcherCheck)
// Add liveness probe to check the connection with OFSwitch.
// This helps automatic recovery if some issues cause OFSwitch reconnection to not work properly, e.g. issue #4092.
ovsConnCheck := healthz.NamedCheck("ovs", func(_ *http.Request) error {
if aq.GetOpenflowClient().IsConnected() {
return nil
}
return fmt.Errorf("disconnected from OFSwitch")
})
serverConfig.LivezChecks = append(serverConfig.LivezChecks, ovsConnCheck)

completedServerCfg := serverConfig.Complete(nil)
return &completedServerCfg, nil
Expand Down
153 changes: 153 additions & 0 deletions pkg/agent/apiserver/apiserver_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
// Copyright 2022 Antrea Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package apiserver

import (
"net"
"net/http"
"net/http/httptest"
"os"
"testing"
"time"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
genericapiserver "k8s.io/apiserver/pkg/server"

oftest "antrea.io/antrea/pkg/agent/openflow/testing"
aqtest "antrea.io/antrea/pkg/agent/querier/testing"
queriertest "antrea.io/antrea/pkg/querier/testing"
"antrea.io/antrea/pkg/version"
)

type fakeAgentAPIServer struct {
*agentAPIServer
agentQuerier *aqtest.MockAgentQuerier
npQuerier *queriertest.MockAgentNetworkPolicyInfoQuerier
ofClient *oftest.MockClient
}

func newFakeAPIServer(t *testing.T) *fakeAgentAPIServer {
f, err := os.CreateTemp("", "authkubeconfig")
if err != nil {
t.Fatal(err)
}
defer func() {
f.Close()
os.Remove(f.Name())
}()
if _, err := f.Write([]byte(`
apiVersion: v1
kind: Config
clusters:
- cluster:
server: http://localhost:56789
name: cluster
contexts:
- context:
cluster: cluster
name: cluster
current-context: cluster
`)); err != nil {
t.Fatal(err)
}
originalTokenPath := TokenPath
tokenFile, err := os.CreateTemp("", "tokenFile")
TokenPath = tokenFile.Name()
defer func() {
TokenPath = originalTokenPath
tokenFile.Close()
os.Remove(tokenFile.Name())
}()
version.Version = "v1.2.3"
ctrl := gomock.NewController(t)
defer ctrl.Finish()
agentQuerier := aqtest.NewMockAgentQuerier(ctrl)
npQuerier := queriertest.NewMockAgentNetworkPolicyInfoQuerier(ctrl)
ofClient := oftest.NewMockClient(ctrl)
agentQuerier.EXPECT().GetOpenflowClient().AnyTimes().Return(ofClient)
cfg, err := newConfig(agentQuerier, npQuerier, net.ParseIP("127.0.0.1"), 10000, true, f.Name(), true)
require.NoError(t, err)
s, err := cfg.New(Name, genericapiserver.NewEmptyDelegate())
require.NoError(t, err)
agentAPIServer := &agentAPIServer{GenericAPIServer: s}
fakeAPIServer := &fakeAgentAPIServer{
agentAPIServer: agentAPIServer,
agentQuerier: agentQuerier,
npQuerier: npQuerier,
ofClient: ofClient,
}
return fakeAPIServer
}

func TestAPIServerLivezCheck(t *testing.T) {
tests := []struct {
name string
setupFunc func(*fakeAgentAPIServer)
expectedCode int
expectedBody string
}{
{
name: "ovs connected",
setupFunc: func(apiserver *fakeAgentAPIServer) {
apiserver.ofClient.EXPECT().IsConnected().Return(true)
},
expectedCode: 200,
expectedBody: "ok",
},
{
name: "ovs disconnected",
setupFunc: func(apiserver *fakeAgentAPIServer) {
apiserver.ofClient.EXPECT().IsConnected().Return(false)
},
expectedCode: 500,
expectedBody: `[+]ping ok
[+]log ok
[-]ovs failed: reason withheld
[+]poststarthook/max-in-flight-filter ok
livez check failed
`,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
apiserver := newFakeAPIServer(t)
stopCh := make(chan struct{})
defer close(stopCh)
go func() {
apiserver.Run(stopCh)
}()
// Wait for APIServer to be health so checks installed by default are ensured ok.
assert.Eventuallyf(t, func() bool {
response := getResponse(apiserver, "/healthz")
return response.Body.String() == "ok"
}, time.Second*5, time.Millisecond*100, "APIServer didn't become health in 5 seconds")

tt.setupFunc(apiserver)
response := getResponse(apiserver, "/livez")
assert.Equal(t, tt.expectedBody, response.Body.String())
assert.Equal(t, tt.expectedCode, response.Code)
})
}
}

func getResponse(apiserver *fakeAgentAPIServer, query string) *httptest.ResponseRecorder {
req, _ := http.NewRequest(http.MethodGet, query, nil)
recorder := httptest.NewRecorder()
apiserver.GenericAPIServer.Handler.ServeHTTP(recorder, req)
return recorder
}
21 changes: 17 additions & 4 deletions pkg/ovs/openflow/ofctrl_bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ type OFBridge struct {
// tableCache is used to cache ofTables.
tableCache map[uint8]*ofTable

ofSwitchMutex sync.RWMutex
// ofSwitch is the target OFSwitch.
ofSwitch *ofctrl.OFSwitch
// controller helps maintain connections to remote OFSwitch.
Expand Down Expand Up @@ -322,9 +323,13 @@ func (b *OFBridge) PacketRcvd(sw *ofctrl.OFSwitch, packet *ofctrl.PacketIn) {
// SwitchConnected is a callback when the remote OFSwitch is connected.
func (b *OFBridge) SwitchConnected(sw *ofctrl.OFSwitch) {
klog.Infof("OFSwitch is connected: %v", sw.DPID())
// initialize tables.
b.ofSwitch = sw
func() {
b.ofSwitchMutex.Lock()
defer b.ofSwitchMutex.Unlock()
b.ofSwitch = sw
}()
b.ofSwitch.EnableMonitor()
// initialize tables.
b.initialize()
go func() {
// b.connected is nil if it is an automatic reconnection but not triggered by OFSwitch.Connect.
Expand Down Expand Up @@ -429,7 +434,15 @@ func (b *OFBridge) DeleteFlowsByCookie(cookieID, cookieMask uint64) error {
}

func (b *OFBridge) IsConnected() bool {
return b.ofSwitch.IsReady()
sw := func() *ofctrl.OFSwitch {
b.ofSwitchMutex.RLock()
defer b.ofSwitchMutex.RUnlock()
return b.ofSwitch
}()
if sw == nil {
return false
}
return sw.IsReady()
}

func (b *OFBridge) AddFlowsInBundle(addflows []Flow, modFlows []Flow, delFlows []Flow) error {
Expand Down Expand Up @@ -755,7 +768,7 @@ func (b *OFBridge) processTableFeatures(ch chan *openflow15.MultipartReply) {
}
}

func NewOFBridge(br string, mgmtAddr string) Bridge {
func NewOFBridge(br string, mgmtAddr string) *OFBridge {
s := &OFBridge{
bridgeName: br,
mgmtAddr: mgmtAddr,
Expand Down
Loading

0 comments on commit 9d1be1e

Please sign in to comment.