From 9d1be1e86b6339e0c2cf1df247c22c5b89717b7d Mon Sep 17 00:00:00 2001 From: Quan Tian Date: Wed, 17 Aug 2022 17:32:26 +0800 Subject: [PATCH] Add OFSwitch connection check to Agent's liveness probes This helps automatic recovery if some issues cause OFSwitch reconnection to not work properly. It also fixes a race condition between the IsConnected and SwitchConnected methods of OFBridge. For #4092 Signed-off-by: Quan Tian --- .../antrea/templates/agent/daemonset.yaml | 12 +- build/images/scripts/container_liveness_probe | 4 +- build/yamls/antrea-aks.yml | 12 +- build/yamls/antrea-eks.yml | 12 +- build/yamls/antrea-gke.yml | 12 +- build/yamls/antrea-ipsec.yml | 12 +- build/yamls/antrea.yml | 12 +- pkg/agent/apiserver/apiserver.go | 25 ++- pkg/agent/apiserver/apiserver_test.go | 153 ++++++++++++++++++ pkg/ovs/openflow/ofctrl_bridge.go | 21 ++- pkg/ovs/openflow/ofctrl_bridge_test.go | 87 ++++++++++ 11 files changed, 315 insertions(+), 47 deletions(-) create mode 100644 pkg/agent/apiserver/apiserver_test.go create mode 100644 pkg/ovs/openflow/ofctrl_bridge_test.go diff --git a/build/charts/antrea/templates/agent/daemonset.yaml b/build/charts/antrea/templates/agent/daemonset.yaml index acbb97a60a9..ae2b70d2dc7 100644 --- a/build/charts/antrea/templates/agent/daemonset.yaml +++ b/build/charts/antrea/templates/agent/daemonset.yaml @@ -175,12 +175,12 @@ spec: name: api protocol: TCP livenessProbe: - exec: - command: - - /bin/sh - - -c - - container_liveness_probe agent - initialDelaySeconds: 5 + httpGet: + host: localhost + path: /livez + port: api + scheme: HTTPS + initialDelaySeconds: 10 timeoutSeconds: 5 periodSeconds: 10 failureThreshold: 5 diff --git a/build/images/scripts/container_liveness_probe b/build/images/scripts/container_liveness_probe index d2571ddb7f6..b780b0a7325 100755 --- a/build/images/scripts/container_liveness_probe +++ b/build/images/scripts/container_liveness_probe @@ -16,9 +16,7 @@ source daemon_status -if [ $1 == agent ]; then - exit 0 -elif [ $1 == ovs ]; then +if [ $1 == ovs ]; then check_ovs_status && exit 0 elif [ $1 == ovs-ipsec ]; then check_ovs_ipsec_status && exit 0 diff --git a/build/yamls/antrea-aks.yml b/build/yamls/antrea-aks.yml index 61a516c8632..d282b22f6f5 100644 --- a/build/yamls/antrea-aks.yml +++ b/build/yamls/antrea-aks.yml @@ -4082,12 +4082,12 @@ spec: name: api protocol: TCP livenessProbe: - exec: - command: - - /bin/sh - - -c - - container_liveness_probe agent - initialDelaySeconds: 5 + httpGet: + host: localhost + path: /livez + port: api + scheme: HTTPS + initialDelaySeconds: 10 timeoutSeconds: 5 periodSeconds: 10 failureThreshold: 5 diff --git a/build/yamls/antrea-eks.yml b/build/yamls/antrea-eks.yml index 3f494f9f57f..2a1a98a1d20 100644 --- a/build/yamls/antrea-eks.yml +++ b/build/yamls/antrea-eks.yml @@ -4084,12 +4084,12 @@ spec: name: api protocol: TCP livenessProbe: - exec: - command: - - /bin/sh - - -c - - container_liveness_probe agent - initialDelaySeconds: 5 + httpGet: + host: localhost + path: /livez + port: api + scheme: HTTPS + initialDelaySeconds: 10 timeoutSeconds: 5 periodSeconds: 10 failureThreshold: 5 diff --git a/build/yamls/antrea-gke.yml b/build/yamls/antrea-gke.yml index 7e1ffc5630a..7be90cc21d3 100644 --- a/build/yamls/antrea-gke.yml +++ b/build/yamls/antrea-gke.yml @@ -4081,12 +4081,12 @@ spec: name: api protocol: TCP livenessProbe: - exec: - command: - - /bin/sh - - -c - - container_liveness_probe agent - initialDelaySeconds: 5 + httpGet: + host: localhost + path: /livez + port: api + scheme: HTTPS + initialDelaySeconds: 10 timeoutSeconds: 5 periodSeconds: 10 failureThreshold: 5 diff --git a/build/yamls/antrea-ipsec.yml b/build/yamls/antrea-ipsec.yml index f7ce222434b..fa029e2e4e5 100644 --- a/build/yamls/antrea-ipsec.yml +++ b/build/yamls/antrea-ipsec.yml @@ -4101,12 +4101,12 @@ spec: name: api protocol: TCP livenessProbe: - exec: - command: - - /bin/sh - - -c - - container_liveness_probe agent - initialDelaySeconds: 5 + httpGet: + host: localhost + path: /livez + port: api + scheme: HTTPS + initialDelaySeconds: 10 timeoutSeconds: 5 periodSeconds: 10 failureThreshold: 5 diff --git a/build/yamls/antrea.yml b/build/yamls/antrea.yml index cbd2077ee51..6fe4de91738 100644 --- a/build/yamls/antrea.yml +++ b/build/yamls/antrea.yml @@ -4081,12 +4081,12 @@ spec: name: api protocol: TCP livenessProbe: - exec: - command: - - /bin/sh - - -c - - container_liveness_probe agent - initialDelaySeconds: 5 + httpGet: + host: localhost + path: /livez + port: api + scheme: HTTPS + initialDelaySeconds: 10 timeoutSeconds: 5 periodSeconds: 10 failureThreshold: 5 diff --git a/pkg/agent/apiserver/apiserver.go b/pkg/agent/apiserver/apiserver.go index 9abf80cfa9a..5f53e393fb1 100644 --- a/pkg/agent/apiserver/apiserver.go +++ b/pkg/agent/apiserver/apiserver.go @@ -101,7 +101,7 @@ func installAPIGroup(s *genericapiserver.GenericAPIServer, aq agentquerier.Agent // New creates an APIServer for running in antrea agent. func New(aq agentquerier.AgentQuerier, npq querier.AgentNetworkPolicyInfoQuerier, mq querier.AgentMulticastInfoQuerier, seipq querier.ServiceExternalIPStatusQuerier, bindAddress net.IP, bindPort int, enableMetrics bool, kubeconfig string, cipherSuites []uint16, tlsMinVersion uint16, v4Enabled, v6Enabled bool) (*agentAPIServer, error) { - cfg, err := newConfig(npq, bindAddress, bindPort, enableMetrics, kubeconfig) + cfg, err := newConfig(aq, npq, bindAddress, bindPort, enableMetrics, kubeconfig, false) if err != nil { return nil, err } @@ -118,7 +118,13 @@ func New(aq agentquerier.AgentQuerier, npq querier.AgentNetworkPolicyInfoQuerier return &agentAPIServer{GenericAPIServer: s}, nil } -func newConfig(npq querier.AgentNetworkPolicyInfoQuerier, bindAddress net.IP, bindPort int, enableMetrics bool, kubeconfig string) (*genericapiserver.CompletedConfig, error) { +func newConfig(aq agentquerier.AgentQuerier, + npq querier.AgentNetworkPolicyInfoQuerier, + bindAddress net.IP, + bindPort int, + enableMetrics bool, + kubeconfig string, + skipInClusterLookup bool) (*genericapiserver.CompletedConfig, error) { secureServing := genericoptions.NewSecureServingOptions().WithLoopback() authentication := genericoptions.NewDelegatingAuthenticationOptions() authorization := genericoptions.NewDelegatingAuthorizationOptions().WithAlwaysAllowPaths("/healthz", "/livez", "/readyz") @@ -128,6 +134,8 @@ func newConfig(npq querier.AgentNetworkPolicyInfoQuerier, bindAddress net.IP, bi authentication.RemoteKubeConfigFile = kubeconfig authorization.RemoteKubeConfigFile = kubeconfig } + // InClusterLookup is skipped when testing, otherwise it would always fail as there is no real cluster. + authentication.SkipInClusterLookup = skipInClusterLookup // Set the PairName but leave certificate directory blank to generate in-memory by default. secureServing.ServerCert.CertDirectory = "" @@ -164,13 +172,22 @@ func newConfig(npq querier.AgentNetworkPolicyInfoQuerier, bindAddress net.IP, bi } serverConfig.EnableMetrics = enableMetrics // Add readiness probe to check the status of watchers. - check := healthz.NamedCheck("watcher", func(_ *http.Request) error { + watcherCheck := healthz.NamedCheck("watcher", func(_ *http.Request) error { if npq.GetControllerConnectionStatus() { return nil } return fmt.Errorf("some watchers may not be connected") }) - serverConfig.ReadyzChecks = append(serverConfig.ReadyzChecks, check) + serverConfig.ReadyzChecks = append(serverConfig.ReadyzChecks, watcherCheck) + // Add liveness probe to check the connection with OFSwitch. + // This helps automatic recovery if some issues cause OFSwitch reconnection to not work properly, e.g. issue #4092. + ovsConnCheck := healthz.NamedCheck("ovs", func(_ *http.Request) error { + if aq.GetOpenflowClient().IsConnected() { + return nil + } + return fmt.Errorf("disconnected from OFSwitch") + }) + serverConfig.LivezChecks = append(serverConfig.LivezChecks, ovsConnCheck) completedServerCfg := serverConfig.Complete(nil) return &completedServerCfg, nil diff --git a/pkg/agent/apiserver/apiserver_test.go b/pkg/agent/apiserver/apiserver_test.go new file mode 100644 index 00000000000..90c4d61ab8f --- /dev/null +++ b/pkg/agent/apiserver/apiserver_test.go @@ -0,0 +1,153 @@ +// Copyright 2022 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package apiserver + +import ( + "net" + "net/http" + "net/http/httptest" + "os" + "testing" + "time" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + genericapiserver "k8s.io/apiserver/pkg/server" + + oftest "antrea.io/antrea/pkg/agent/openflow/testing" + aqtest "antrea.io/antrea/pkg/agent/querier/testing" + queriertest "antrea.io/antrea/pkg/querier/testing" + "antrea.io/antrea/pkg/version" +) + +type fakeAgentAPIServer struct { + *agentAPIServer + agentQuerier *aqtest.MockAgentQuerier + npQuerier *queriertest.MockAgentNetworkPolicyInfoQuerier + ofClient *oftest.MockClient +} + +func newFakeAPIServer(t *testing.T) *fakeAgentAPIServer { + f, err := os.CreateTemp("", "authkubeconfig") + if err != nil { + t.Fatal(err) + } + defer func() { + f.Close() + os.Remove(f.Name()) + }() + if _, err := f.Write([]byte(` +apiVersion: v1 +kind: Config +clusters: +- cluster: + server: http://localhost:56789 + name: cluster +contexts: +- context: + cluster: cluster + name: cluster +current-context: cluster +`)); err != nil { + t.Fatal(err) + } + originalTokenPath := TokenPath + tokenFile, err := os.CreateTemp("", "tokenFile") + TokenPath = tokenFile.Name() + defer func() { + TokenPath = originalTokenPath + tokenFile.Close() + os.Remove(tokenFile.Name()) + }() + version.Version = "v1.2.3" + ctrl := gomock.NewController(t) + defer ctrl.Finish() + agentQuerier := aqtest.NewMockAgentQuerier(ctrl) + npQuerier := queriertest.NewMockAgentNetworkPolicyInfoQuerier(ctrl) + ofClient := oftest.NewMockClient(ctrl) + agentQuerier.EXPECT().GetOpenflowClient().AnyTimes().Return(ofClient) + cfg, err := newConfig(agentQuerier, npQuerier, net.ParseIP("127.0.0.1"), 10000, true, f.Name(), true) + require.NoError(t, err) + s, err := cfg.New(Name, genericapiserver.NewEmptyDelegate()) + require.NoError(t, err) + agentAPIServer := &agentAPIServer{GenericAPIServer: s} + fakeAPIServer := &fakeAgentAPIServer{ + agentAPIServer: agentAPIServer, + agentQuerier: agentQuerier, + npQuerier: npQuerier, + ofClient: ofClient, + } + return fakeAPIServer +} + +func TestAPIServerLivezCheck(t *testing.T) { + tests := []struct { + name string + setupFunc func(*fakeAgentAPIServer) + expectedCode int + expectedBody string + }{ + { + name: "ovs connected", + setupFunc: func(apiserver *fakeAgentAPIServer) { + apiserver.ofClient.EXPECT().IsConnected().Return(true) + }, + expectedCode: 200, + expectedBody: "ok", + }, + { + name: "ovs disconnected", + setupFunc: func(apiserver *fakeAgentAPIServer) { + apiserver.ofClient.EXPECT().IsConnected().Return(false) + }, + expectedCode: 500, + expectedBody: `[+]ping ok +[+]log ok +[-]ovs failed: reason withheld +[+]poststarthook/max-in-flight-filter ok +livez check failed +`, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + apiserver := newFakeAPIServer(t) + stopCh := make(chan struct{}) + defer close(stopCh) + go func() { + apiserver.Run(stopCh) + }() + // Wait for APIServer to be health so checks installed by default are ensured ok. + assert.Eventuallyf(t, func() bool { + response := getResponse(apiserver, "/healthz") + return response.Body.String() == "ok" + }, time.Second*5, time.Millisecond*100, "APIServer didn't become health in 5 seconds") + + tt.setupFunc(apiserver) + response := getResponse(apiserver, "/livez") + assert.Equal(t, tt.expectedBody, response.Body.String()) + assert.Equal(t, tt.expectedCode, response.Code) + }) + } +} + +func getResponse(apiserver *fakeAgentAPIServer, query string) *httptest.ResponseRecorder { + req, _ := http.NewRequest(http.MethodGet, query, nil) + recorder := httptest.NewRecorder() + apiserver.GenericAPIServer.Handler.ServeHTTP(recorder, req) + return recorder +} diff --git a/pkg/ovs/openflow/ofctrl_bridge.go b/pkg/ovs/openflow/ofctrl_bridge.go index 225e8f9a4e5..c2562207aab 100644 --- a/pkg/ovs/openflow/ofctrl_bridge.go +++ b/pkg/ovs/openflow/ofctrl_bridge.go @@ -185,6 +185,7 @@ type OFBridge struct { // tableCache is used to cache ofTables. tableCache map[uint8]*ofTable + ofSwitchMutex sync.RWMutex // ofSwitch is the target OFSwitch. ofSwitch *ofctrl.OFSwitch // controller helps maintain connections to remote OFSwitch. @@ -322,9 +323,13 @@ func (b *OFBridge) PacketRcvd(sw *ofctrl.OFSwitch, packet *ofctrl.PacketIn) { // SwitchConnected is a callback when the remote OFSwitch is connected. func (b *OFBridge) SwitchConnected(sw *ofctrl.OFSwitch) { klog.Infof("OFSwitch is connected: %v", sw.DPID()) - // initialize tables. - b.ofSwitch = sw + func() { + b.ofSwitchMutex.Lock() + defer b.ofSwitchMutex.Unlock() + b.ofSwitch = sw + }() b.ofSwitch.EnableMonitor() + // initialize tables. b.initialize() go func() { // b.connected is nil if it is an automatic reconnection but not triggered by OFSwitch.Connect. @@ -429,7 +434,15 @@ func (b *OFBridge) DeleteFlowsByCookie(cookieID, cookieMask uint64) error { } func (b *OFBridge) IsConnected() bool { - return b.ofSwitch.IsReady() + sw := func() *ofctrl.OFSwitch { + b.ofSwitchMutex.RLock() + defer b.ofSwitchMutex.RUnlock() + return b.ofSwitch + }() + if sw == nil { + return false + } + return sw.IsReady() } func (b *OFBridge) AddFlowsInBundle(addflows []Flow, modFlows []Flow, delFlows []Flow) error { @@ -755,7 +768,7 @@ func (b *OFBridge) processTableFeatures(ch chan *openflow15.MultipartReply) { } } -func NewOFBridge(br string, mgmtAddr string) Bridge { +func NewOFBridge(br string, mgmtAddr string) *OFBridge { s := &OFBridge{ bridgeName: br, mgmtAddr: mgmtAddr, diff --git a/pkg/ovs/openflow/ofctrl_bridge_test.go b/pkg/ovs/openflow/ofctrl_bridge_test.go new file mode 100644 index 00000000000..e69b454000b --- /dev/null +++ b/pkg/ovs/openflow/ofctrl_bridge_test.go @@ -0,0 +1,87 @@ +// Copyright 2022 Antrea Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package openflow + +import ( + "math/rand" + "net" + "sync" + "testing" + "time" + + "antrea.io/antrea/pkg/ovs/ovsconfig" + "antrea.io/libOpenflow/util" + "antrea.io/ofnet/ofctrl" +) + +type fakeConn struct{} + +func (f *fakeConn) Close() error { + return nil +} + +func (f *fakeConn) Read(b []byte) (int, error) { + return len(b), nil +} + +func (f *fakeConn) Write(b []byte) (int, error) { + return len(b), nil +} + +func (f *fakeConn) LocalAddr() net.Addr { + return nil +} + +func (f *fakeConn) RemoteAddr() net.Addr { + return nil +} + +func (f *fakeConn) SetDeadline(t time.Time) error { + return nil +} + +func (f *fakeConn) SetReadDeadline(t time.Time) error { + return nil +} + +func (f *fakeConn) SetWriteDeadline(t time.Time) error { + return nil +} + +func newFakeOFSwitch(app ofctrl.AppInterface) *ofctrl.OFSwitch { + stream := util.NewMessageStream(&fakeConn{}, nil) + dpid, _ := net.ParseMAC("01:02:03:04:05:06:07:08") + connCh := make(chan int) + sw := ofctrl.NewSwitch(stream, dpid, app, connCh, uint16(rand.Uint32())) + return sw +} + +// TestOFBridgeIsConnected verifies it's thread-safe to call OFBridge's IsConnected method. +func TestOFBridgeIsConnected(t *testing.T) { + b := NewOFBridge("test-br", GetMgmtAddress(ovsconfig.DefaultOVSRunDir, "test-br")) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + sw := newFakeOFSwitch(b) + b.SwitchConnected(sw) + }() + wg.Add(1) + go func() { + defer wg.Done() + b.IsConnected() + }() + wg.Wait() +}